state: refactor some node/coordinate state store functions to take an EnterpriseMeta (#10687)

Note the field is not used yet.
This commit is contained in:
R.B. Boyer 2021-07-23 13:42:23 -05:00 committed by GitHub
parent c4911cc3ba
commit c271976445
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 184 additions and 130 deletions

View file

@ -298,10 +298,11 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error { func(ws memdb.WatchSet, state *state.Store) error {
var err error var err error
// TODO(partitions)
if len(args.NodeMetaFilters) > 0 { if len(args.NodeMetaFilters) > 0 {
reply.Index, reply.Nodes, err = state.NodesByMeta(ws, args.NodeMetaFilters) reply.Index, reply.Nodes, err = state.NodesByMeta(ws, args.NodeMetaFilters, nil)
} else { } else {
reply.Index, reply.Nodes, err = state.Nodes(ws) reply.Index, reply.Nodes, err = state.Nodes(ws, nil)
} }
if err != nil { if err != nil {
return err return err

View file

@ -199,7 +199,8 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I
return c.srv.blockingQuery(&args.QueryOptions, return c.srv.blockingQuery(&args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error { func(ws memdb.WatchSet, state *state.Store) error {
index, coords, err := state.Coordinates(ws) // TODO(partitions)
index, coords, err := state.Coordinates(ws, nil)
if err != nil { if err != nil {
return err return err
} }
@ -236,7 +237,8 @@ func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.Inde
return c.srv.blockingQuery(&args.QueryOptions, return c.srv.blockingQuery(&args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error { func(ws memdb.WatchSet, state *state.Store) error {
index, nodeCoords, err := state.Coordinate(args.Node, ws) // TODO(partitions)
index, nodeCoords, err := state.Coordinate(ws, args.Node, nil)
if err != nil { if err != nil {
return err return err
} }

View file

@ -10,14 +10,15 @@ import (
"testing" "testing"
"time" "time"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/coordinate"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testrpc"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/coordinate"
"github.com/stretchr/testify/require"
) )
// generateRandomCoordinate creates a random coordinate. This mucks with the // generateRandomCoordinate creates a random coordinate. This mucks with the
@ -83,13 +84,15 @@ func TestCoordinate_Update(t *testing.T) {
// Make sure the updates did not yet apply because the update period // Make sure the updates did not yet apply because the update period
// hasn't expired. // hasn't expired.
state := s1.fsm.State() state := s1.fsm.State()
_, c, err := state.Coordinate("node1", nil) // TODO(partitions)
_, c, err := state.Coordinate(nil, "node1", nil)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
require.Equal(t, lib.CoordinateSet{}, c) require.Equal(t, lib.CoordinateSet{}, c)
_, c, err = state.Coordinate("node2", nil) // TODO(partitions)
_, c, err = state.Coordinate(nil, "node2", nil)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -104,7 +107,8 @@ func TestCoordinate_Update(t *testing.T) {
// Wait a while and the updates should get picked up. // Wait a while and the updates should get picked up.
time.Sleep(3 * s1.config.CoordinateUpdatePeriod) time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
_, c, err = state.Coordinate("node1", nil) // TODO(partitions)
_, c, err = state.Coordinate(nil, "node1", nil)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -113,7 +117,8 @@ func TestCoordinate_Update(t *testing.T) {
} }
require.Equal(t, expected, c) require.Equal(t, expected, c)
_, c, err = state.Coordinate("node2", nil) // TODO(partitions)
_, c, err = state.Coordinate(nil, "node2", nil)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -152,7 +157,8 @@ func TestCoordinate_Update(t *testing.T) {
time.Sleep(3 * s1.config.CoordinateUpdatePeriod) time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
numDropped := 0 numDropped := 0
for i := 0; i < spamLen; i++ { for i := 0; i < spamLen; i++ {
_, c, err = state.Coordinate(fmt.Sprintf("bogusnode%d", i), nil) // TODO(partitions)
_, c, err = state.Coordinate(nil, fmt.Sprintf("bogusnode%d", i), nil)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }

View file

@ -169,7 +169,7 @@ func (c *FSM) applyDeregister(buf []byte, index uint64) interface{} {
return err return err
} }
} else { } else {
if err := c.state.DeleteNode(index, req.Node); err != nil { if err := c.state.DeleteNode(index, req.Node, &req.EnterpriseMeta); err != nil {
c.logger.Warn("DeleteNode failed", "error", err) c.logger.Warn("DeleteNode failed", "error", err)
return err return err
} }

View file

@ -68,7 +68,7 @@ func TestFSM_RegisterNode(t *testing.T) {
} }
// Verify we are registered // Verify we are registered
_, node, err := fsm.state.GetNode("foo") _, node, err := fsm.state.GetNode("foo", nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -126,7 +126,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) {
} }
// Verify we are registered // Verify we are registered
_, node, err := fsm.state.GetNode("foo") _, node, err := fsm.state.GetNode("foo", nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -198,7 +198,7 @@ func TestFSM_DeregisterService(t *testing.T) {
} }
// Verify we are registered // Verify we are registered
_, node, err := fsm.state.GetNode("foo") _, node, err := fsm.state.GetNode("foo", nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -261,7 +261,7 @@ func TestFSM_DeregisterCheck(t *testing.T) {
} }
// Verify we are registered // Verify we are registered
_, node, err := fsm.state.GetNode("foo") _, node, err := fsm.state.GetNode("foo", nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -330,7 +330,7 @@ func TestFSM_DeregisterNode(t *testing.T) {
} }
// Verify we are not registered // Verify we are not registered
_, node, err := fsm.state.GetNode("foo") _, node, err := fsm.state.GetNode("foo", nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -730,7 +730,7 @@ func TestFSM_CoordinateUpdate(t *testing.T) {
} }
// Read back the two coordinates to make sure they got updated. // Read back the two coordinates to make sure they got updated.
_, coords, err := fsm.state.Coordinates(nil) _, coords, err := fsm.state.Coordinates(nil, nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -1543,7 +1543,7 @@ func TestFSM_Chunking_Lifecycle(t *testing.T) {
// Verify we are not registered // Verify we are not registered
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
_, node, err := fsm.state.GetNode(fmt.Sprintf("foo%d", i)) _, node, err := fsm.state.GetNode(fmt.Sprintf("foo%d", i), nil)
require.NoError(err) require.NoError(err)
assert.Nil(node) assert.Nil(node)
} }
@ -1566,7 +1566,7 @@ func TestFSM_Chunking_Lifecycle(t *testing.T) {
// Verify we are still not registered // Verify we are still not registered
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
_, node, err := fsm2.state.GetNode(fmt.Sprintf("foo%d", i)) _, node, err := fsm2.state.GetNode(fmt.Sprintf("foo%d", i), nil)
require.NoError(err) require.NoError(err)
assert.Nil(node) assert.Nil(node)
} }
@ -1590,7 +1590,7 @@ func TestFSM_Chunking_Lifecycle(t *testing.T) {
// Verify we are registered // Verify we are registered
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
_, node, err := fsm2.state.GetNode(fmt.Sprintf("foo%d", i)) _, node, err := fsm2.state.GetNode(fmt.Sprintf("foo%d", i), nil)
require.NoError(err) require.NoError(err)
assert.NotNil(node) assert.NotNil(node)

View file

@ -115,7 +115,8 @@ func (s *snapshot) persistNodes(sink raft.SnapshotSink,
} }
// Register each service this node has // Register each service this node has
services, err := s.state.Services(n.Node) // TODO(partitions)
services, err := s.state.Services(n.Node, nil)
if err != nil { if err != nil {
return err return err
} }
@ -131,7 +132,8 @@ func (s *snapshot) persistNodes(sink raft.SnapshotSink,
// Register each check this node has // Register each check this node has
req.Service = nil req.Service = nil
checks, err := s.state.Checks(n.Node) // TODO(partitions)
checks, err := s.state.Checks(n.Node, nil)
if err != nil { if err != nil {
return err return err
} }
@ -153,6 +155,7 @@ func (s *snapshot) persistNodes(sink raft.SnapshotSink,
if err != nil { if err != nil {
return err return err
} }
// TODO(partitions)
for coord := coords.Next(); coord != nil; coord = coords.Next() { for coord := coords.Next(); coord != nil; coord = coords.Next() {
if _, err := sink.Write([]byte{byte(structs.CoordinateBatchUpdateType)}); err != nil { if _, err := sink.Write([]byte{byte(structs.CoordinateBatchUpdateType)}); err != nil {
return err return err

View file

@ -489,7 +489,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
require.NoError(t, fsm2.Restore(sink)) require.NoError(t, fsm2.Restore(sink))
// Verify the contents // Verify the contents
_, nodes, err := fsm2.state.Nodes(nil) _, nodes, err := fsm2.state.Nodes(nil, nil)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, nodes, 2, "incorect number of nodes: %v", nodes) require.Len(t, nodes, 2, "incorect number of nodes: %v", nodes)
@ -625,7 +625,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
}() }()
// Verify coordinates are restored // Verify coordinates are restored
_, coords, err := fsm2.state.Coordinates(nil) _, coords, err := fsm2.state.Coordinates(nil, nil)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, updates, coords) require.Equal(t, updates, coords)
@ -749,7 +749,7 @@ func TestFSM_BadRestore_OSS(t *testing.T) {
require.Error(t, fsm.Restore(sink)) require.Error(t, fsm.Restore(sink))
// Verify the contents didn't get corrupted. // Verify the contents didn't get corrupted.
_, nodes, err := fsm.state.Nodes(nil) _, nodes, err := fsm.state.Nodes(nil, nil)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, nodes, 1) require.Len(t, nodes, 1)
require.Equal(t, "foo", nodes[0].Node) require.Equal(t, "foo", nodes[0].Node)

View file

@ -1139,7 +1139,8 @@ func (s *Server) reconcileReaped(known map[string]struct{}) error {
CHECKS: CHECKS:
for _, service := range services.Services { for _, service := range services.Services {
if service.ID == structs.ConsulServiceID { if service.ID == structs.ConsulServiceID {
_, node, err := state.GetNode(check.Node) // TODO(partitions)
_, node, err := state.GetNode(check.Node, nil)
if err != nil { if err != nil {
s.logger.Error("Unable to look up node with name", "name", check.Node, "error", err) s.logger.Error("Unable to look up node with name", "name", check.Node, "error", err)
continue CHECKS continue CHECKS
@ -1262,7 +1263,8 @@ func (s *Server) handleAliveMember(member serf.Member) error {
// Check if the node exists // Check if the node exists
state := s.fsm.State() state := s.fsm.State()
_, node, err := state.GetNode(member.Name) // TODO(partitions)
_, node, err := state.GetNode(member.Name, nil)
if err != nil { if err != nil {
return err return err
} }
@ -1330,7 +1332,8 @@ AFTER_CHECK:
func (s *Server) handleFailedMember(member serf.Member) error { func (s *Server) handleFailedMember(member serf.Member) error {
// Check if the node exists // Check if the node exists
state := s.fsm.State() state := s.fsm.State()
_, node, err := state.GetNode(member.Name) // TODO(partitions)
_, node, err := state.GetNode(member.Name, nil)
if err != nil { if err != nil {
return err return err
} }
@ -1407,7 +1410,8 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error
// Check if the node does not exist // Check if the node does not exist
state := s.fsm.State() state := s.fsm.State()
_, node, err := state.GetNode(member.Name) // TODO(partitions)
_, node, err := state.GetNode(member.Name, nil)
if err != nil { if err != nil {
return err return err
} }

View file

@ -9,16 +9,17 @@ import (
"testing" "testing"
"time" "time"
"github.com/hashicorp/go-hclog"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
tokenStore "github.com/hashicorp/consul/agent/token" tokenStore "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/go-hclog"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require"
) )
func TestLeader_RegisterMember(t *testing.T) { func TestLeader_RegisterMember(t *testing.T) {
@ -48,7 +49,8 @@ func TestLeader_RegisterMember(t *testing.T) {
// Client should be registered // Client should be registered
state := s1.fsm.State() state := s1.fsm.State()
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName) // TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil { if err != nil {
r.Fatalf("err: %v", err) r.Fatalf("err: %v", err)
} }
@ -77,7 +79,8 @@ func TestLeader_RegisterMember(t *testing.T) {
// Server should be registered // Server should be registered
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(s1.config.NodeName) // TODO(partitions)
_, node, err := state.GetNode(s1.config.NodeName, nil)
if err != nil { if err != nil {
r.Fatalf("err: %v", err) r.Fatalf("err: %v", err)
} }
@ -126,7 +129,8 @@ func TestLeader_FailedMember(t *testing.T) {
// Should be registered // Should be registered
state := s1.fsm.State() state := s1.fsm.State()
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName) // TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil { if err != nil {
r.Fatalf("err: %v", err) r.Fatalf("err: %v", err)
} }
@ -187,7 +191,8 @@ func TestLeader_LeftMember(t *testing.T) {
// Should be registered // Should be registered
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName) // TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil { if err != nil {
r.Fatalf("err: %v", err) r.Fatalf("err: %v", err)
} }
@ -202,7 +207,8 @@ func TestLeader_LeftMember(t *testing.T) {
// Should be deregistered // Should be deregistered
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName) // TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil { if err != nil {
r.Fatalf("err: %v", err) r.Fatalf("err: %v", err)
} }
@ -237,7 +243,8 @@ func TestLeader_ReapMember(t *testing.T) {
// Should be registered // Should be registered
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName) // TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil { if err != nil {
r.Fatalf("err: %v", err) r.Fatalf("err: %v", err)
} }
@ -262,7 +269,8 @@ func TestLeader_ReapMember(t *testing.T) {
// anti-entropy will put it back. // anti-entropy will put it back.
reaped := false reaped := false
for start := time.Now(); time.Since(start) < 5*time.Second; { for start := time.Now(); time.Since(start) < 5*time.Second; {
_, node, err := state.GetNode(c1.config.NodeName) // TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -431,7 +439,8 @@ func TestLeader_ReapServer(t *testing.T) {
// s3 should be registered // s3 should be registered
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(s3.config.NodeName) // TODO(partitions)
_, node, err := state.GetNode(s3.config.NodeName, nil)
if err != nil { if err != nil {
r.Fatalf("err: %v", err) r.Fatalf("err: %v", err)
} }
@ -452,7 +461,8 @@ func TestLeader_ReapServer(t *testing.T) {
} }
// s3 should be deregistered // s3 should be deregistered
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(s3.config.NodeName) // TODO(partitions)
_, node, err := state.GetNode(s3.config.NodeName, nil)
if err != nil { if err != nil {
r.Fatalf("err: %v", err) r.Fatalf("err: %v", err)
} }
@ -507,7 +517,8 @@ func TestLeader_Reconcile_ReapMember(t *testing.T) {
// Node should be gone // Node should be gone
state := s1.fsm.State() state := s1.fsm.State()
_, node, err := state.GetNode("no-longer-around") // TODO(partitions)
_, node, err := state.GetNode("no-longer-around", nil)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -540,7 +551,8 @@ func TestLeader_Reconcile(t *testing.T) {
// Should not be registered // Should not be registered
state := s1.fsm.State() state := s1.fsm.State()
_, node, err := state.GetNode(c1.config.NodeName) // TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -550,7 +562,8 @@ func TestLeader_Reconcile(t *testing.T) {
// Should be registered // Should be registered
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName) // TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil { if err != nil {
r.Fatalf("err: %v", err) r.Fatalf("err: %v", err)
} }
@ -582,7 +595,8 @@ func TestLeader_Reconcile_Races(t *testing.T) {
state := s1.fsm.State() state := s1.fsm.State()
var nodeAddr string var nodeAddr string
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName) // TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil { if err != nil {
r.Fatalf("err: %v", err) r.Fatalf("err: %v", err)
} }
@ -618,7 +632,8 @@ func TestLeader_Reconcile_Races(t *testing.T) {
if err := s1.reconcile(); err != nil { if err := s1.reconcile(); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
_, node, err := state.GetNode(c1.config.NodeName) // TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -642,7 +657,8 @@ func TestLeader_Reconcile_Races(t *testing.T) {
}) })
// Make sure the metadata didn't get clobbered. // Make sure the metadata didn't get clobbered.
_, node, err = state.GetNode(c1.config.NodeName) // TODO(partitions)
_, node, err = state.GetNode(c1.config.NodeName, nil)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -757,7 +773,8 @@ func TestLeader_LeftLeader(t *testing.T) {
// Verify the old leader is deregistered // Verify the old leader is deregistered
state := remain.fsm.State() state := remain.fsm.State()
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(leader.config.NodeName) // TODO(partitions)
_, node, err := state.GetNode(leader.config.NodeName, nil)
if err != nil { if err != nil {
r.Fatalf("err: %v", err) r.Fatalf("err: %v", err)
} }

View file

@ -402,7 +402,8 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
qs.Node = args.Agent.Node qs.Node = args.Agent.Node
} else if qs.Node == "_ip" { } else if qs.Node == "_ip" {
if args.Source.Ip != "" { if args.Source.Ip != "" {
_, nodes, err := state.Nodes(nil) // TODO(partitions)
_, nodes, err := state.Nodes(nil, nil)
if err != nil { if err != nil {
return err return err
} }

View file

@ -22,7 +22,8 @@ func (s *Server) newNodeSorter(cs lib.CoordinateSet, nodes structs.Nodes) (sort.
state := s.fsm.State() state := s.fsm.State()
vec := make([]float64, len(nodes)) vec := make([]float64, len(nodes))
for i, node := range nodes { for i, node := range nodes {
_, other, err := state.Coordinate(node.Node, nil) // TODO(partitions)
_, other, err := state.Coordinate(nil, node.Node, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -62,7 +63,8 @@ func (s *Server) newServiceNodeSorter(cs lib.CoordinateSet, nodes structs.Servic
state := s.fsm.State() state := s.fsm.State()
vec := make([]float64, len(nodes)) vec := make([]float64, len(nodes))
for i, node := range nodes { for i, node := range nodes {
_, other, err := state.Coordinate(node.Node, nil) // TODO(partitions)
_, other, err := state.Coordinate(nil, node.Node, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -102,7 +104,8 @@ func (s *Server) newHealthCheckSorter(cs lib.CoordinateSet, checks structs.Healt
state := s.fsm.State() state := s.fsm.State()
vec := make([]float64, len(checks)) vec := make([]float64, len(checks))
for i, check := range checks { for i, check := range checks {
_, other, err := state.Coordinate(check.Node, nil) // TODO(partitions)
_, other, err := state.Coordinate(nil, check.Node, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -142,7 +145,8 @@ func (s *Server) newCheckServiceNodeSorter(cs lib.CoordinateSet, nodes structs.C
state := s.fsm.State() state := s.fsm.State()
vec := make([]float64, len(nodes)) vec := make([]float64, len(nodes))
for i, node := range nodes { for i, node := range nodes {
_, other, err := state.Coordinate(node.Node.Node, nil) // TODO(partitions)
_, other, err := state.Coordinate(nil, node.Node.Node, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -203,7 +207,7 @@ func (s *Server) sortNodesByDistanceFrom(source structs.QuerySource, subj interf
// There won't always be coordinates for the source node. If there are // There won't always be coordinates for the source node. If there are
// none then we can bail out because there's no meaning for the sort. // none then we can bail out because there's no meaning for the sort.
state := s.fsm.State() state := s.fsm.State()
_, cs, err := state.Coordinate(source.Node, nil) _, cs, err := state.Coordinate(nil, source.Node, source.NodeEnterpriseMeta())
if err != nil { if err != nil {
return err return err
} }

View file

@ -49,13 +49,15 @@ func (s *Snapshot) Nodes() (memdb.ResultIterator, error) {
// Services is used to pull the full list of services for a given node for use // Services is used to pull the full list of services for a given node for use
// during snapshots. // during snapshots.
func (s *Snapshot) Services(node string) (memdb.ResultIterator, error) { func (s *Snapshot) Services(node string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
// TODO(partitions): use the provided entmeta
return s.tx.Get(tableServices, indexNode, Query{Value: node}) return s.tx.Get(tableServices, indexNode, Query{Value: node})
} }
// Checks is used to pull the full list of checks for a given node for use // Checks is used to pull the full list of checks for a given node for use
// during snapshots. // during snapshots.
func (s *Snapshot) Checks(node string) (memdb.ResultIterator, error) { func (s *Snapshot) Checks(node string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
// TODO(partitions): use the provided entmeta
return s.tx.Get(tableChecks, indexNode, Query{Value: node}) return s.tx.Get(tableChecks, indexNode, Query{Value: node})
} }
@ -329,7 +331,8 @@ func (s *Store) ensureNodeTxn(tx WriteTxn, idx uint64, preserveIndexes bool, nod
} }
// GetNode is used to retrieve a node registration by node name ID. // GetNode is used to retrieve a node registration by node name ID.
func (s *Store) GetNode(id string) (uint64, *structs.Node, error) { func (s *Store) GetNode(nodeNameOrID string, _ *structs.EnterpriseMeta) (uint64, *structs.Node, error) {
// TODO(partitions): use the provided entmeta
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
@ -337,7 +340,7 @@ func (s *Store) GetNode(id string) (uint64, *structs.Node, error) {
idx := maxIndexTxn(tx, "nodes") idx := maxIndexTxn(tx, "nodes")
// Retrieve the node from the state store // Retrieve the node from the state store
node, err := getNodeTxn(tx, id) node, err := getNodeTxn(tx, nodeNameOrID)
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("node lookup failed: %s", err) return 0, nil, fmt.Errorf("node lookup failed: %s", err)
} }
@ -386,7 +389,8 @@ func (s *Store) GetNodeID(id types.NodeID) (uint64, *structs.Node, error) {
} }
// Nodes is used to return all of the known nodes. // Nodes is used to return all of the known nodes.
func (s *Store) Nodes(ws memdb.WatchSet) (uint64, structs.Nodes, error) { func (s *Store) Nodes(ws memdb.WatchSet, _ *structs.EnterpriseMeta) (uint64, structs.Nodes, error) {
// TODO(partitions): use the provided entmeta
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
@ -409,7 +413,8 @@ func (s *Store) Nodes(ws memdb.WatchSet) (uint64, structs.Nodes, error) {
} }
// NodesByMeta is used to return all nodes with the given metadata key/value pairs. // NodesByMeta is used to return all nodes with the given metadata key/value pairs.
func (s *Store) NodesByMeta(ws memdb.WatchSet, filters map[string]string) (uint64, structs.Nodes, error) { func (s *Store) NodesByMeta(ws memdb.WatchSet, filters map[string]string, _ *structs.EnterpriseMeta) (uint64, structs.Nodes, error) {
// TODO(partitions): use the provided entmeta
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
@ -440,7 +445,8 @@ func (s *Store) NodesByMeta(ws memdb.WatchSet, filters map[string]string) (uint6
} }
// DeleteNode is used to delete a given node by its ID. // DeleteNode is used to delete a given node by its ID.
func (s *Store) DeleteNode(idx uint64, nodeName string) error { func (s *Store) DeleteNode(idx uint64, nodeName string, _ *structs.EnterpriseMeta) error {
// TODO(partitions): use the provided entmeta
tx := s.db.WriteTxn(idx) tx := s.db.WriteTxn(idx)
defer tx.Abort() defer tx.Abort()

View file

@ -176,7 +176,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 1}, RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 1},
} }
_, out, err := s.GetNode("node1") _, out, err := s.GetNode("node1", nil)
if err != nil { if err != nil {
t.Fatalf("got err %s want nil", err) t.Fatalf("got err %s want nil", err)
} }
@ -392,7 +392,7 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
// Retrieve the node and verify its contents. // Retrieve the node and verify its contents.
verifyNode := func(nodeLookup string) { verifyNode := func(nodeLookup string) {
_, out, err := s.GetNode(nodeLookup) _, out, err := s.GetNode(nodeLookup, nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -550,7 +550,7 @@ func deprecatedEnsureNodeWithoutIDCanRegister(t *testing.T, s *Store, nodeName s
if err := s.EnsureNode(txIdx, in); err != nil { if err := s.EnsureNode(txIdx, in); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
idx, out, err := s.GetNode(nodeName) idx, out, err := s.GetNode(nodeName, nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -580,7 +580,7 @@ func TestStateStore_EnsureNodeDeprecated(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
// Retrieve the node again // Retrieve the node again
idx, out, err := s.GetNode(firstNodeName) idx, out, err := s.GetNode(firstNodeName, nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -603,7 +603,7 @@ func TestStateStore_EnsureNodeDeprecated(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
// Retrieve the node again // Retrieve the node again
idx, out, err = s.GetNode(firstNodeName) idx, out, err = s.GetNode(firstNodeName, nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -725,7 +725,7 @@ func TestNodeRenamingNodes(t *testing.T) {
} }
// Retrieve the node again // Retrieve the node again
idx, out, err := s.GetNode("node2bis") idx, out, err := s.GetNode("node2bis", nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -749,7 +749,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
// Fetching a non-existent node returns nil // Fetching a non-existent node returns nil
if _, node, err := s.GetNode("node1"); node != nil || err != nil { if _, node, err := s.GetNode("node1", nil); node != nil || err != nil {
t.Fatalf("expected (nil, nil), got: (%#v, %#v)", node, err) t.Fatalf("expected (nil, nil), got: (%#v, %#v)", node, err)
} }
@ -766,7 +766,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
} }
// Retrieve the node again // Retrieve the node again
idx, out, err := s.GetNode("node1") idx, out, err := s.GetNode("node1", nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -795,7 +795,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
} }
// Retrieve the node // Retrieve the node
idx, out, err = s.GetNode("node1") idx, out, err = s.GetNode("node1", nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -812,7 +812,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
if err := s.EnsureNode(3, in2); err != nil { if err := s.EnsureNode(3, in2); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
_, out, err = s.GetNode("node1") _, out, err = s.GetNode("node1", nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -829,7 +829,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
if err := s.EnsureNode(3, in3); err != nil { if err := s.EnsureNode(3, in3); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
idx, out, err = s.GetNode("node1") idx, out, err = s.GetNode("node1", nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -856,13 +856,13 @@ func TestStateStore_EnsureNode(t *testing.T) {
} }
// Retrieve the node // Retrieve the node
_, out, err = s.GetNode("node1") _, out, err = s.GetNode("node1", nil)
require.NoError(t, err) require.NoError(t, err)
if out != nil { if out != nil {
t.Fatalf("Node should not exist anymore: %q", out) t.Fatalf("Node should not exist anymore: %q", out)
} }
idx, out, err = s.GetNode("node1-renamed") idx, out, err = s.GetNode("node1-renamed", nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -921,7 +921,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
} }
// Retrieve the node // Retrieve the node
_, out, err = s.GetNode("Node1bis") _, out, err = s.GetNode("Node1bis", nil)
require.NoError(t, err) require.NoError(t, err)
if out == nil { if out == nil {
t.Fatalf("Node should exist, but was null") t.Fatalf("Node should exist, but was null")
@ -937,7 +937,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
idx, out, err = s.GetNode("Node1bis") idx, out, err = s.GetNode("Node1bis", nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -980,7 +980,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
if err := s.EnsureNode(12, in); err != nil { if err := s.EnsureNode(12, in); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
idx, out, err = s.GetNode("Node1-Renamed2") idx, out, err = s.GetNode("Node1-Renamed2", nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -1010,7 +1010,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
if err := s.EnsureNode(15, in); err != nil { if err := s.EnsureNode(15, in); err != nil {
t.Fatalf("[DEPRECATED] it should work, err:= %q", err) t.Fatalf("[DEPRECATED] it should work, err:= %q", err)
} }
_, out, err = s.GetNode("Node1-Renamed2") _, out, err = s.GetNode("Node1-Renamed2", nil)
if err != nil { if err != nil {
t.Fatalf("[DEPRECATED] err: %s", err) t.Fatalf("[DEPRECATED] err: %s", err)
} }
@ -1027,7 +1027,7 @@ func TestStateStore_GetNodes(t *testing.T) {
// Listing with no results returns nil. // Listing with no results returns nil.
ws := memdb.NewWatchSet() ws := memdb.NewWatchSet()
idx, res, err := s.Nodes(ws) idx, res, err := s.Nodes(ws, nil)
if idx != 0 || res != nil || err != nil { if idx != 0 || res != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err) t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
} }
@ -1042,7 +1042,7 @@ func TestStateStore_GetNodes(t *testing.T) {
// Retrieve the nodes. // Retrieve the nodes.
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
idx, nodes, err := s.Nodes(ws) idx, nodes, err := s.Nodes(ws, nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -1072,7 +1072,7 @@ func TestStateStore_GetNodes(t *testing.T) {
if watchFired(ws) { if watchFired(ws) {
t.Fatalf("bad") t.Fatalf("bad")
} }
if err := s.DeleteNode(3, "node1"); err != nil { if err := s.DeleteNode(3, "node1", nil); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if !watchFired(ws) { if !watchFired(ws) {
@ -1092,7 +1092,7 @@ func BenchmarkGetNodes(b *testing.B) {
ws := memdb.NewWatchSet() ws := memdb.NewWatchSet()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
s.Nodes(ws) s.Nodes(ws, nil)
} }
} }
@ -1101,7 +1101,7 @@ func TestStateStore_GetNodesByMeta(t *testing.T) {
// Listing with no results returns nil // Listing with no results returns nil
ws := memdb.NewWatchSet() ws := memdb.NewWatchSet()
idx, res, err := s.NodesByMeta(ws, map[string]string{"somekey": "somevalue"}) idx, res, err := s.NodesByMeta(ws, map[string]string{"somekey": "somevalue"}, nil)
if idx != 0 || res != nil || err != nil { if idx != 0 || res != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err) t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
} }
@ -1141,7 +1141,7 @@ func TestStateStore_GetNodesByMeta(t *testing.T) {
} }
for _, tc := range cases { for _, tc := range cases {
_, result, err := s.NodesByMeta(nil, tc.filters) _, result, err := s.NodesByMeta(nil, tc.filters, nil)
if err != nil { if err != nil {
t.Fatalf("bad: %v", err) t.Fatalf("bad: %v", err)
} }
@ -1159,7 +1159,7 @@ func TestStateStore_GetNodesByMeta(t *testing.T) {
// Set up a watch. // Set up a watch.
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
_, _, err = s.NodesByMeta(ws, map[string]string{"role": "client"}) _, _, err = s.NodesByMeta(ws, map[string]string{"role": "client"}, nil)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -1285,12 +1285,12 @@ func TestStateStore_DeleteNode(t *testing.T) {
testRegisterCheck(t, s, 2, "node1", "", "check1", api.HealthPassing) testRegisterCheck(t, s, 2, "node1", "", "check1", api.HealthPassing)
// Delete the node // Delete the node
if err := s.DeleteNode(3, "node1"); err != nil { if err := s.DeleteNode(3, "node1", nil); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
// The node was removed // The node was removed
if idx, n, err := s.GetNode("node1"); err != nil || n != nil || idx != 3 { if idx, n, err := s.GetNode("node1", nil); err != nil || n != nil || idx != 3 {
t.Fatalf("bad: %#v %d (err: %#v)", n, idx, err) t.Fatalf("bad: %#v %d (err: %#v)", n, idx, err)
} }
@ -1324,7 +1324,7 @@ func TestStateStore_DeleteNode(t *testing.T) {
// Deleting a nonexistent node should be idempotent and not return // Deleting a nonexistent node should be idempotent and not return
// an error // an error
if err := s.DeleteNode(4, "node1"); err != nil { if err := s.DeleteNode(4, "node1", nil); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if idx := s.maxIndex("nodes"); idx != 3 { if idx := s.maxIndex("nodes"); idx != 3 {
@ -1618,7 +1618,7 @@ func TestStateStore_Services(t *testing.T) {
} }
// Deleting a node with a service should fire the watch. // Deleting a node with a service should fire the watch.
if err := s.DeleteNode(6, "node1"); err != nil { if err := s.DeleteNode(6, "node1", nil); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if !watchFired(ws) { if !watchFired(ws) {
@ -1868,7 +1868,7 @@ func TestStateStore_ServiceNodes(t *testing.T) {
} }
// But removing a node with the "db" service should fire the watch. // But removing a node with the "db" service should fire the watch.
if err := s.DeleteNode(18, "bar"); err != nil { if err := s.DeleteNode(18, "bar", nil); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if !watchFired(ws) { if !watchFired(ws) {
@ -1972,7 +1972,7 @@ func TestStateStore_ServiceTagNodes(t *testing.T) {
} }
// But removing a node with the "db:primary" service should fire the watch. // But removing a node with the "db:primary" service should fire the watch.
if err := s.DeleteNode(21, "foo"); err != nil { if err := s.DeleteNode(21, "foo", nil); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if !watchFired(ws) { if !watchFired(ws) {
@ -2134,7 +2134,7 @@ func TestStateStore_ConnectServiceNodes(t *testing.T) {
assert.False(watchFired(ws)) assert.False(watchFired(ws))
// But removing a node with the "db" service should fire the watch. // But removing a node with the "db" service should fire the watch.
assert.Nil(s.DeleteNode(18, "bar")) assert.Nil(s.DeleteNode(18, "bar", nil))
assert.True(watchFired(ws)) assert.True(watchFired(ws))
} }
@ -2293,7 +2293,7 @@ func TestStateStore_Service_Snapshot(t *testing.T) {
if idx := snap.LastIndex(); idx != 4 { if idx := snap.LastIndex(); idx != 4 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
services, err := snap.Services("node1") services, err := snap.Services("node1", nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -3875,7 +3875,7 @@ func TestStateStore_Check_Snapshot(t *testing.T) {
if idx := snap.LastIndex(); idx != 5 { if idx := snap.LastIndex(); idx != 5 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
iter, err := snap.Checks("node1") iter, err := snap.Checks("node1", nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -4131,7 +4131,7 @@ func TestStateStore_ServiceDump(t *testing.T) {
{ {
name: "delete a node", name: "delete a node",
modFn: func(t *testing.T) { modFn: func(t *testing.T) {
s.DeleteNode(12, "node2") s.DeleteNode(12, "node2", nil)
}, },
allFired: true, // fires due to "index" allFired: true, // fires due to "index"
kindFired: true, // fires due to "index" kindFired: true, // fires due to "index"
@ -6791,7 +6791,7 @@ func TestCatalog_topologyCleanupPanic(t *testing.T) {
assert.True(t, watchFired(ws)) assert.True(t, watchFired(ws))
// Now delete the node Foo, and this would panic because of the deletion within an iterator // Now delete the node Foo, and this would panic because of the deletion within an iterator
require.NoError(t, s.DeleteNode(3, "foo")) require.NoError(t, s.DeleteNode(3, "foo", nil))
assert.True(t, watchFired(ws)) assert.True(t, watchFired(ws))
} }
@ -7074,7 +7074,7 @@ func TestCatalog_cleanupGatewayWildcards_panic(t *testing.T) {
require.NoError(t, s.EnsureService(5, "foo", &api2)) require.NoError(t, s.EnsureService(5, "foo", &api2))
// Now delete the node "foo", and this would panic because of the deletion within an iterator // Now delete the node "foo", and this would panic because of the deletion within an iterator
require.NoError(t, s.DeleteNode(6, "foo")) require.NoError(t, s.DeleteNode(6, "foo", nil))
} }
func TestCatalog_DownstreamsForService(t *testing.T) { func TestCatalog_DownstreamsForService(t *testing.T) {

View file

@ -82,7 +82,8 @@ func (s *Restore) Coordinates(idx uint64, updates structs.Coordinates) error {
// Coordinate returns a map of coordinates for the given node, indexed by // Coordinate returns a map of coordinates for the given node, indexed by
// network segment. // network segment.
func (s *Store) Coordinate(node string, ws memdb.WatchSet) (uint64, lib.CoordinateSet, error) { func (s *Store) Coordinate(ws memdb.WatchSet, node string, _ *structs.EnterpriseMeta) (uint64, lib.CoordinateSet, error) {
// TODO(partitions): use the provided entmeta
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
@ -103,7 +104,8 @@ func (s *Store) Coordinate(node string, ws memdb.WatchSet) (uint64, lib.Coordina
} }
// Coordinates queries for all nodes with coordinates. // Coordinates queries for all nodes with coordinates.
func (s *Store) Coordinates(ws memdb.WatchSet) (uint64, structs.Coordinates, error) { func (s *Store) Coordinates(ws memdb.WatchSet, _ *structs.EnterpriseMeta) (uint64, structs.Coordinates, error) {
// TODO(partitions): use the provided entmeta
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()

View file

@ -5,13 +5,16 @@ import (
"math/rand" "math/rand"
"testing" "testing"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/coordinate"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
) )
// TODO(partitions): test partitioned nodes here
// generateRandomCoordinate creates a random coordinate. This mucks with the // generateRandomCoordinate creates a random coordinate. This mucks with the
// underlying structure directly, so it's not really useful for any particular // underlying structure directly, so it's not really useful for any particular
// position in the network, but it's a good payload to send through to make // position in the network, but it's a good payload to send through to make
@ -37,7 +40,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// Make sure the coordinates list starts out empty, and that a query for // Make sure the coordinates list starts out empty, and that a query for
// a per-node coordinate for a nonexistent node doesn't do anything bad. // a per-node coordinate for a nonexistent node doesn't do anything bad.
ws := memdb.NewWatchSet() ws := memdb.NewWatchSet()
idx, all, err := s.Coordinates(ws) idx, all, err := s.Coordinates(ws, nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -47,7 +50,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
require.Nil(t, all) require.Nil(t, all)
coordinateWs := memdb.NewWatchSet() coordinateWs := memdb.NewWatchSet()
_, coords, err := s.Coordinate("nope", coordinateWs) _, coords, err := s.Coordinate(coordinateWs, "nope", nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -75,7 +78,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// Should still be empty, though applying an empty batch does bump // Should still be empty, though applying an empty batch does bump
// the table index. // the table index.
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
idx, all, err = s.Coordinates(ws) idx, all, err = s.Coordinates(ws, nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -85,7 +88,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
require.Nil(t, all) require.Nil(t, all)
coordinateWs = memdb.NewWatchSet() coordinateWs = memdb.NewWatchSet()
idx, _, err = s.Coordinate("node1", coordinateWs) idx, _, err = s.Coordinate(coordinateWs, "node1", nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -105,7 +108,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// Should go through now. // Should go through now.
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
idx, all, err = s.Coordinates(ws) idx, all, err = s.Coordinates(ws, nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -118,7 +121,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
nodeWs := make([]memdb.WatchSet, len(updates)) nodeWs := make([]memdb.WatchSet, len(updates))
for i, update := range updates { for i, update := range updates {
nodeWs[i] = memdb.NewWatchSet() nodeWs[i] = memdb.NewWatchSet()
idx, coords, err := s.Coordinate(update.Node, nodeWs[i]) idx, coords, err := s.Coordinate(nodeWs[i], update.Node, nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -146,7 +149,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
} }
// Verify it got applied. // Verify it got applied.
idx, all, err = s.Coordinates(nil) idx, all, err = s.Coordinates(nil, nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -157,7 +160,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// And check the per-node coordinate version of the same thing. // And check the per-node coordinate version of the same thing.
for _, update := range updates { for _, update := range updates {
idx, coords, err := s.Coordinate(update.Node, nil) idx, coords, err := s.Coordinate(nil, update.Node, nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -183,7 +186,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// Verify we are at the previous state, though the empty batch does bump // Verify we are at the previous state, though the empty batch does bump
// the table index. // the table index.
idx, all, err = s.Coordinates(nil) idx, all, err = s.Coordinates(nil, nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -215,7 +218,7 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) {
} }
// Make sure it's in there. // Make sure it's in there.
_, coords, err := s.Coordinate("node1", nil) _, coords, err := s.Coordinate(nil, "node1", nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -226,19 +229,19 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) {
require.Equal(t, expected, coords) require.Equal(t, expected, coords)
// Now delete the node. // Now delete the node.
if err := s.DeleteNode(3, "node1"); err != nil { if err := s.DeleteNode(3, "node1", nil); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
// Make sure the coordinate is gone. // Make sure the coordinate is gone.
_, coords, err = s.Coordinate("node1", nil) _, coords, err = s.Coordinate(nil, "node1", nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
require.Equal(t, lib.CoordinateSet{}, coords) require.Equal(t, lib.CoordinateSet{}, coords)
// Make sure the index got updated. // Make sure the index got updated.
idx, all, err := s.Coordinates(nil) idx, all, err := s.Coordinates(nil, nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -326,7 +329,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
restore.Commit() restore.Commit()
// Read the restored coordinates back out and verify that they match. // Read the restored coordinates back out and verify that they match.
idx, res, err := s.Coordinates(nil) idx, res, err := s.Coordinates(nil, nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }

View file

@ -9,10 +9,11 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
"github.com/hashicorp/go-memdb"
) )
func TestStateStore_SessionCreate_SessionGet(t *testing.T) { func TestStateStore_SessionCreate_SessionGet(t *testing.T) {
@ -552,7 +553,7 @@ func TestStateStore_Session_Invalidate_DeleteNode(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if err := s.DeleteNode(15, "foo"); err != nil { if err := s.DeleteNode(15, "foo", nil); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if !watchFired(ws) { if !watchFired(ws) {
@ -776,7 +777,7 @@ func TestStateStore_Session_Invalidate_Key_Unlock_Behavior(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if err := s.DeleteNode(6, "foo"); err != nil { if err := s.DeleteNode(6, "foo", nil); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if !watchFired(ws) { if !watchFired(ws) {
@ -858,7 +859,7 @@ func TestStateStore_Session_Invalidate_Key_Delete_Behavior(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if err := s.DeleteNode(6, "foo"); err != nil { if err := s.DeleteNode(6, "foo", nil); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if !watchFired(ws) { if !watchFired(ws) {

View file

@ -5,10 +5,11 @@ import (
"strings" "strings"
"testing" "testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
"github.com/stretchr/testify/require"
) )
//nolint:staticcheck //nolint:staticcheck
@ -195,7 +196,7 @@ func TestStateStore_Txn_Node(t *testing.T) {
require.Equal(t, expected, results) require.Equal(t, expected, results)
// Pull the resulting state store contents. // Pull the resulting state store contents.
idx, actual, err := s.Nodes(nil) idx, actual, err := s.Nodes(nil, nil)
require.NoError(t, err) require.NoError(t, err)
if idx != 8 { if idx != 8 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)

View file

@ -3,9 +3,10 @@ package state
import ( import (
"testing" "testing"
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb" memdb "github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs"
) )
func TestStateStore_Usage_NodeCount(t *testing.T) { func TestStateStore_Usage_NodeCount(t *testing.T) {
@ -37,7 +38,7 @@ func TestStateStore_Usage_NodeCount_Delete(t *testing.T) {
require.Equal(t, idx, uint64(1)) require.Equal(t, idx, uint64(1))
require.Equal(t, count, 2) require.Equal(t, count, 2)
require.NoError(t, s.DeleteNode(2, "node2")) require.NoError(t, s.DeleteNode(2, "node2", nil))
idx, count, err = s.NodeCount() idx, count, err = s.NodeCount()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, idx, uint64(2)) require.Equal(t, idx, uint64(2))
@ -83,7 +84,7 @@ func TestStateStore_Usage_ServiceUsage_DeleteNode(t *testing.T) {
require.Equal(t, usage.Services, 1) require.Equal(t, usage.Services, 1)
require.Equal(t, usage.ServiceInstances, 2) require.Equal(t, usage.ServiceInstances, 2)
require.NoError(t, s.DeleteNode(3, "node1")) require.NoError(t, s.DeleteNode(3, "node1", nil))
idx, usage, err = s.ServiceUsage() idx, usage, err = s.ServiceUsage()
require.NoError(t, err) require.NoError(t, err)

View file

@ -8,13 +8,14 @@ import (
"testing" "testing"
"time" "time"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/stretchr/testify/require"
) )
var testTxnRules = ` var testTxnRules = `
@ -233,7 +234,8 @@ func TestTxn_Apply(t *testing.T) {
t.Fatalf("bad: %v", d) t.Fatalf("bad: %v", d)
} }
_, n, err := state.GetNode("foo") // TODO(partitions)
_, n, err := state.GetNode("foo", nil)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }