state: add tests for new txn ops
This commit is contained in:
parent
a40a346be8
commit
41e8120d3d
|
@ -738,10 +738,10 @@ func (s *Store) EnsureService(idx uint64, node string, svc *structs.NodeService)
|
|||
// ensureServiceCASTxn updates a service only if the existing index matches the given index.
|
||||
// Returns a bool indicating if a write happened and any error.
|
||||
func (s *Store) ensureServiceCASTxn(tx *memdb.Txn, idx uint64, node string, svc *structs.NodeService) (bool, error) {
|
||||
// Retrieve the existing entry.
|
||||
existing, err := tx.First("nodes", "id", node)
|
||||
// Retrieve the existing service.
|
||||
existing, err := tx.First("services", "id", node, svc.ID)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("node lookup failed: %s", err)
|
||||
return false, fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
|
||||
// Check if the we should do the set. A ModifyIndex of 0 means that
|
||||
|
|
|
@ -132,10 +132,12 @@ func (s *Store) txnNode(tx *memdb.Txn, idx uint64, op *structs.TxnNodeOp) (struc
|
|||
entry, err = getNodeIDTxn(tx, op.Node.ID)
|
||||
|
||||
case api.NodeSet:
|
||||
entry = &op.Node
|
||||
err = s.ensureNodeTxn(tx, idx, &op.Node)
|
||||
|
||||
case api.NodeCAS:
|
||||
var ok bool
|
||||
entry = &op.Node
|
||||
ok, err = s.ensureNodeCASTxn(tx, idx, &op.Node)
|
||||
if !ok && err == nil {
|
||||
err = fmt.Errorf("failed to set node %q, index is stale", op.Node.Node)
|
||||
|
@ -185,6 +187,7 @@ func (s *Store) txnService(tx *memdb.Txn, idx uint64, op *structs.TxnServiceOp)
|
|||
entry, err = s.nodeServiceTxn(tx, op.Node, op.Service.ID)
|
||||
|
||||
case api.ServiceSet:
|
||||
entry = &op.Service
|
||||
err = s.ensureServiceTxn(tx, idx, op.Node, &op.Service)
|
||||
|
||||
case api.ServiceCAS:
|
||||
|
@ -192,7 +195,9 @@ func (s *Store) txnService(tx *memdb.Txn, idx uint64, op *structs.TxnServiceOp)
|
|||
ok, err = s.ensureServiceCASTxn(tx, idx, op.Node, &op.Service)
|
||||
if !ok && err == nil {
|
||||
err = fmt.Errorf("failed to set service %q on node %q, index is stale", op.Service.ID, op.Node)
|
||||
break
|
||||
}
|
||||
entry, err = s.nodeServiceTxn(tx, op.Node, op.Service.ID)
|
||||
|
||||
case api.ServiceDelete:
|
||||
err = s.deleteServiceTxn(tx, idx, op.Node, op.Service.ID)
|
||||
|
|
|
@ -1,12 +1,14 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/types"
|
||||
"github.com/pascaldekloe/goe/verify"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -116,6 +118,379 @@ func TestStateStore_Txn_Intention(t *testing.T) {
|
|||
verify.Values(t, "", actual, intentions)
|
||||
}
|
||||
|
||||
func TestStateStore_Txn_Node(t *testing.T) {
|
||||
require := require.New(t)
|
||||
s := testStateStore(t)
|
||||
|
||||
// Create some nodes.
|
||||
var nodes [5]structs.Node
|
||||
for i := 0; i < len(nodes); i++ {
|
||||
nodes[i] = structs.Node{
|
||||
Node: fmt.Sprintf("node%d", i+1),
|
||||
ID: types.NodeID(testUUID()),
|
||||
}
|
||||
|
||||
// Leave node5 to be created by an operation.
|
||||
if i < 5 {
|
||||
s.EnsureNode(uint64(i+1), &nodes[i])
|
||||
}
|
||||
}
|
||||
|
||||
// Set up a transaction that hits every operation.
|
||||
ops := structs.TxnOps{
|
||||
&structs.TxnOp{
|
||||
Node: &structs.TxnNodeOp{
|
||||
Verb: api.NodeGet,
|
||||
Node: nodes[0],
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
Node: &structs.TxnNodeOp{
|
||||
Verb: api.NodeSet,
|
||||
Node: nodes[4],
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
Node: &structs.TxnNodeOp{
|
||||
Verb: api.NodeCAS,
|
||||
Node: structs.Node{
|
||||
Node: "node2",
|
||||
ID: nodes[1].ID,
|
||||
Datacenter: "dc2",
|
||||
RaftIndex: structs.RaftIndex{ModifyIndex: 2},
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
Node: &structs.TxnNodeOp{
|
||||
Verb: api.NodeDelete,
|
||||
Node: structs.Node{Node: "node3"},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
Node: &structs.TxnNodeOp{
|
||||
Verb: api.NodeDeleteCAS,
|
||||
Node: structs.Node{
|
||||
Node: "node4",
|
||||
RaftIndex: structs.RaftIndex{ModifyIndex: 4},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
results, errors := s.TxnRW(8, ops)
|
||||
if len(errors) > 0 {
|
||||
t.Fatalf("err: %v", errors)
|
||||
}
|
||||
|
||||
// Make sure the response looks as expected.
|
||||
nodes[1].Datacenter = "dc2"
|
||||
nodes[1].ModifyIndex = 8
|
||||
expected := structs.TxnResults{
|
||||
&structs.TxnResult{
|
||||
Node: &nodes[0],
|
||||
},
|
||||
&structs.TxnResult{
|
||||
Node: &nodes[4],
|
||||
},
|
||||
&structs.TxnResult{
|
||||
Node: &nodes[1],
|
||||
},
|
||||
}
|
||||
verify.Values(t, "", results, expected)
|
||||
|
||||
// Pull the resulting state store contents.
|
||||
idx, actual, err := s.Nodes(nil)
|
||||
require.NoError(err)
|
||||
if idx != 8 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// Make sure it looks as expected.
|
||||
expectedNodes := structs.Nodes{&nodes[0], &nodes[1], &nodes[4]}
|
||||
verify.Values(t, "", actual, expectedNodes)
|
||||
}
|
||||
|
||||
func TestStateStore_Txn_Service(t *testing.T) {
|
||||
require := require.New(t)
|
||||
s := testStateStore(t)
|
||||
|
||||
testRegisterNode(t, s, 1, "node1")
|
||||
|
||||
// Create some services.
|
||||
for i := 1; i <= 4; i++ {
|
||||
testRegisterService(t, s, uint64(i+1), "node1", fmt.Sprintf("svc%d", i))
|
||||
}
|
||||
|
||||
// Set up a transaction that hits every operation.
|
||||
ops := structs.TxnOps{
|
||||
&structs.TxnOp{
|
||||
Service: &structs.TxnServiceOp{
|
||||
Verb: api.ServiceGet,
|
||||
Node: "node1",
|
||||
Service: structs.NodeService{ID: "svc1"},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
Service: &structs.TxnServiceOp{
|
||||
Verb: api.ServiceSet,
|
||||
Node: "node1",
|
||||
Service: structs.NodeService{ID: "svc5"},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
Service: &structs.TxnServiceOp{
|
||||
Verb: api.ServiceCAS,
|
||||
Node: "node1",
|
||||
Service: structs.NodeService{
|
||||
ID: "svc2",
|
||||
Tags: []string{"modified"},
|
||||
RaftIndex: structs.RaftIndex{ModifyIndex: 3},
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
Service: &structs.TxnServiceOp{
|
||||
Verb: api.ServiceDelete,
|
||||
Node: "node1",
|
||||
Service: structs.NodeService{ID: "svc3"},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
Service: &structs.TxnServiceOp{
|
||||
Verb: api.ServiceDeleteCAS,
|
||||
Node: "node1",
|
||||
Service: structs.NodeService{
|
||||
ID: "svc4",
|
||||
RaftIndex: structs.RaftIndex{ModifyIndex: 5},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
results, errors := s.TxnRW(6, ops)
|
||||
if len(errors) > 0 {
|
||||
t.Fatalf("err: %v", errors)
|
||||
}
|
||||
|
||||
// Make sure the response looks as expected.
|
||||
expected := structs.TxnResults{
|
||||
&structs.TxnResult{
|
||||
Service: &structs.NodeService{
|
||||
ID: "svc1",
|
||||
Service: "svc1",
|
||||
Address: "1.1.1.1",
|
||||
Port: 1111,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 2,
|
||||
ModifyIndex: 2,
|
||||
},
|
||||
Weights: &structs.Weights{Passing: 1, Warning: 1},
|
||||
},
|
||||
},
|
||||
&structs.TxnResult{
|
||||
Service: &structs.NodeService{
|
||||
ID: "svc5",
|
||||
},
|
||||
},
|
||||
&structs.TxnResult{
|
||||
Service: &structs.NodeService{
|
||||
ID: "svc2",
|
||||
Tags: []string{"modified"},
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 3,
|
||||
ModifyIndex: 6,
|
||||
},
|
||||
Weights: &structs.Weights{Passing: 1, Warning: 1},
|
||||
},
|
||||
},
|
||||
}
|
||||
verify.Values(t, "", results, expected)
|
||||
|
||||
// Pull the resulting state store contents.
|
||||
idx, actual, err := s.NodeServices(nil, "node1")
|
||||
require.NoError(err)
|
||||
if idx != 6 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// Make sure it looks as expected.
|
||||
expectedServices := &structs.NodeServices{
|
||||
Node: &structs.Node{
|
||||
Node: "node1",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 1,
|
||||
ModifyIndex: 1,
|
||||
},
|
||||
},
|
||||
Services: map[string]*structs.NodeService{
|
||||
"svc1": &structs.NodeService{
|
||||
ID: "svc1",
|
||||
Service: "svc1",
|
||||
Address: "1.1.1.1",
|
||||
Port: 1111,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 2,
|
||||
ModifyIndex: 2,
|
||||
},
|
||||
Weights: &structs.Weights{Passing: 1, Warning: 1},
|
||||
},
|
||||
"svc5": &structs.NodeService{
|
||||
ID: "svc5",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 6,
|
||||
ModifyIndex: 6,
|
||||
},
|
||||
Weights: &structs.Weights{Passing: 1, Warning: 1},
|
||||
},
|
||||
"svc2": &structs.NodeService{
|
||||
ID: "svc2",
|
||||
Tags: []string{"modified"},
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 3,
|
||||
ModifyIndex: 6,
|
||||
},
|
||||
Weights: &structs.Weights{Passing: 1, Warning: 1},
|
||||
},
|
||||
},
|
||||
}
|
||||
verify.Values(t, "", actual, expectedServices)
|
||||
}
|
||||
|
||||
func TestStateStore_Txn_Checks(t *testing.T) {
|
||||
require := require.New(t)
|
||||
s := testStateStore(t)
|
||||
|
||||
testRegisterNode(t, s, 1, "node1")
|
||||
|
||||
// Create some checks.
|
||||
for i := 1; i <= 4; i++ {
|
||||
testRegisterCheck(t, s, uint64(i+1), "node1", "", types.CheckID(fmt.Sprintf("check%d", i)), "failing")
|
||||
}
|
||||
|
||||
// Set up a transaction that hits every operation.
|
||||
ops := structs.TxnOps{
|
||||
&structs.TxnOp{
|
||||
Check: &structs.TxnCheckOp{
|
||||
Verb: api.CheckGet,
|
||||
Check: structs.HealthCheck{Node: "node1", CheckID: "check1"},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
Check: &structs.TxnCheckOp{
|
||||
Verb: api.CheckSet,
|
||||
Check: structs.HealthCheck{Node: "node1", CheckID: "check5", Status: "passing"},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
Check: &structs.TxnCheckOp{
|
||||
Verb: api.CheckCAS,
|
||||
Check: structs.HealthCheck{
|
||||
Node: "node1",
|
||||
CheckID: "check2",
|
||||
Status: "warning",
|
||||
RaftIndex: structs.RaftIndex{ModifyIndex: 3},
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
Check: &structs.TxnCheckOp{
|
||||
Verb: api.CheckDelete,
|
||||
Check: structs.HealthCheck{Node: "node1", CheckID: "check3"},
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
Check: &structs.TxnCheckOp{
|
||||
Verb: api.CheckDeleteCAS,
|
||||
Check: structs.HealthCheck{
|
||||
Node: "node1",
|
||||
CheckID: "check4",
|
||||
RaftIndex: structs.RaftIndex{ModifyIndex: 5},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
results, errors := s.TxnRW(6, ops)
|
||||
if len(errors) > 0 {
|
||||
t.Fatalf("err: %v", errors)
|
||||
}
|
||||
|
||||
// Make sure the response looks as expected.
|
||||
expected := structs.TxnResults{
|
||||
&structs.TxnResult{
|
||||
Check: &structs.HealthCheck{
|
||||
Node: "node1",
|
||||
CheckID: "check1",
|
||||
Status: "failing",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 2,
|
||||
ModifyIndex: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnResult{
|
||||
Check: &structs.HealthCheck{
|
||||
Node: "node1",
|
||||
CheckID: "check5",
|
||||
Status: "passing",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 6,
|
||||
ModifyIndex: 6,
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.TxnResult{
|
||||
Check: &structs.HealthCheck{
|
||||
Node: "node1",
|
||||
CheckID: "check2",
|
||||
Status: "warning",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 3,
|
||||
ModifyIndex: 6,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
verify.Values(t, "", results, expected)
|
||||
|
||||
// Pull the resulting state store contents.
|
||||
idx, actual, err := s.NodeChecks(nil, "node1")
|
||||
require.NoError(err)
|
||||
if idx != 6 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// Make sure it looks as expected.
|
||||
expectedChecks := structs.HealthChecks{
|
||||
&structs.HealthCheck{
|
||||
Node: "node1",
|
||||
CheckID: "check1",
|
||||
Status: "failing",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 2,
|
||||
ModifyIndex: 2,
|
||||
},
|
||||
},
|
||||
&structs.HealthCheck{
|
||||
Node: "node1",
|
||||
CheckID: "check2",
|
||||
Status: "warning",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 3,
|
||||
ModifyIndex: 6,
|
||||
},
|
||||
},
|
||||
&structs.HealthCheck{
|
||||
Node: "node1",
|
||||
CheckID: "check5",
|
||||
Status: "passing",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 6,
|
||||
ModifyIndex: 6,
|
||||
},
|
||||
},
|
||||
}
|
||||
verify.Values(t, "", actual, expectedChecks)
|
||||
}
|
||||
|
||||
func TestStateStore_Txn_KVS(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
|
|
Loading…
Reference in New Issue