Adds diff check for node and service parts of register requests.
We always did an update before which caused excessive watch churn, even with our new fine-grained queries. This does a diff any only updates the node and service records if something actually changed.
This commit is contained in:
parent
40ef50c738
commit
5afa233d28
|
@ -70,7 +70,7 @@ func (s *StateStore) EnsureRegistration(idx uint64, req *structs.RegisterRequest
|
|||
// conditions on state updates.
|
||||
func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager,
|
||||
req *structs.RegisterRequest) error {
|
||||
// Add the node.
|
||||
// Create a node structure.
|
||||
node := &structs.Node{
|
||||
ID: req.ID,
|
||||
Node: req.Node,
|
||||
|
@ -78,14 +78,37 @@ func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, watches *D
|
|||
TaggedAddresses: req.TaggedAddresses,
|
||||
Meta: req.NodeMeta,
|
||||
}
|
||||
if err := s.ensureNodeTxn(tx, idx, watches, node); err != nil {
|
||||
return fmt.Errorf("failed inserting node: %s", err)
|
||||
|
||||
// Since this gets called for all node operations (service and check
|
||||
// updates) and churn on the node itself is basically none after the
|
||||
// node updates itself the first time, it's worth seeing if we need to
|
||||
// modify the node at all so we prevent watch churn and useless writes
|
||||
// and modify index bumps on the node.
|
||||
{
|
||||
existing, err := tx.First("nodes", "id", node.Node)
|
||||
if err != nil {
|
||||
return fmt.Errorf("node lookup failed: %s", err)
|
||||
}
|
||||
if existing == nil || req.ChangesNode(existing.(*structs.Node)) {
|
||||
if err := s.ensureNodeTxn(tx, idx, watches, node); err != nil {
|
||||
return fmt.Errorf("failed inserting node: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add the service, if any.
|
||||
// Add the service, if any. We perform a similar check as we do for the
|
||||
// node info above to make sure we actually need to update the service
|
||||
// definition in order to prevent useless churn if nothing has changed.
|
||||
if req.Service != nil {
|
||||
if err := s.ensureServiceTxn(tx, idx, watches, req.Node, req.Service); err != nil {
|
||||
return fmt.Errorf("failed inserting service: %s", err)
|
||||
existing, err := tx.First("services", "id", req.Node, req.Service.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
if existing == nil || !(existing.(*structs.ServiceNode).ToNodeService()).IsSame(req.Service) {
|
||||
if err := s.ensureServiceTxn(tx, idx, watches, req.Node, req.Service); err != nil {
|
||||
return fmt.Errorf("failed inserting service: %s", err)
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
|
|||
}
|
||||
|
||||
// Retrieve the node and verify its contents.
|
||||
verifyNode := func(created, modified uint64) {
|
||||
verifyNode := func() {
|
||||
_, out, err := s.GetNode("node1")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
|
@ -41,11 +41,11 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
|
|||
len(out.TaggedAddresses) != 1 ||
|
||||
out.TaggedAddresses["hello"] != "world" ||
|
||||
out.Meta["somekey"] != "somevalue" ||
|
||||
out.CreateIndex != created || out.ModifyIndex != modified {
|
||||
out.CreateIndex != 1 || out.ModifyIndex != 1 {
|
||||
t.Fatalf("bad node returned: %#v", out)
|
||||
}
|
||||
}
|
||||
verifyNode(1, 1)
|
||||
verifyNode()
|
||||
|
||||
// Add in a service definition.
|
||||
req.Service = &structs.NodeService{
|
||||
|
@ -59,12 +59,12 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
|
|||
}
|
||||
|
||||
// Verify that the service got registered.
|
||||
verifyService := func(created, modified uint64) {
|
||||
verifyService := func() {
|
||||
idx, out, err := s.NodeServices("node1")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != modified {
|
||||
if idx != 2 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if len(out.Services) != 1 {
|
||||
|
@ -73,7 +73,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
|
|||
r := out.Services["redis1"]
|
||||
if r == nil || r.ID != "redis1" || r.Service != "redis" ||
|
||||
r.Address != "1.1.1.1" || r.Port != 8080 ||
|
||||
r.CreateIndex != created || r.ModifyIndex != modified {
|
||||
r.CreateIndex != 2 || r.ModifyIndex != 2 {
|
||||
t.Fatalf("bad service returned: %#v", r)
|
||||
}
|
||||
|
||||
|
@ -81,17 +81,17 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != modified {
|
||||
if idx != 2 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if r == nil || r.ID != "redis1" || r.Service != "redis" ||
|
||||
r.Address != "1.1.1.1" || r.Port != 8080 ||
|
||||
r.CreateIndex != created || r.ModifyIndex != modified {
|
||||
r.CreateIndex != 2 || r.ModifyIndex != 2 {
|
||||
t.Fatalf("bad service returned: %#v", r)
|
||||
}
|
||||
}
|
||||
verifyNode(1, 2)
|
||||
verifyService(2, 2)
|
||||
verifyNode()
|
||||
verifyService()
|
||||
|
||||
// Add in a top-level check.
|
||||
req.Check = &structs.HealthCheck{
|
||||
|
@ -104,12 +104,12 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
|
|||
}
|
||||
|
||||
// Verify that the check got registered.
|
||||
verifyCheck := func(created, modified uint64) {
|
||||
verifyCheck := func() {
|
||||
idx, out, err := s.NodeChecks("node1")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != modified {
|
||||
if idx != 3 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if len(out) != 1 {
|
||||
|
@ -117,7 +117,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
|
|||
}
|
||||
c := out[0]
|
||||
if c.Node != "node1" || c.CheckID != "check1" || c.Name != "check" ||
|
||||
c.CreateIndex != created || c.ModifyIndex != modified {
|
||||
c.CreateIndex != 3 || c.ModifyIndex != 3 {
|
||||
t.Fatalf("bad check returned: %#v", c)
|
||||
}
|
||||
|
||||
|
@ -125,17 +125,17 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != modified {
|
||||
if idx != 3 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if c.Node != "node1" || c.CheckID != "check1" || c.Name != "check" ||
|
||||
c.CreateIndex != created || c.ModifyIndex != modified {
|
||||
c.CreateIndex != 3 || c.ModifyIndex != 3 {
|
||||
t.Fatalf("bad check returned: %#v", c)
|
||||
}
|
||||
}
|
||||
verifyNode(1, 3)
|
||||
verifyService(2, 3)
|
||||
verifyCheck(3, 3)
|
||||
verifyNode()
|
||||
verifyService()
|
||||
verifyCheck()
|
||||
|
||||
// Add in another check via the slice.
|
||||
req.Checks = structs.HealthChecks{
|
||||
|
@ -150,9 +150,9 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
|
|||
}
|
||||
|
||||
// Verify that the additional check got registered.
|
||||
verifyNode(1, 4)
|
||||
verifyService(2, 4)
|
||||
func() {
|
||||
verifyNode()
|
||||
verifyService()
|
||||
{
|
||||
idx, out, err := s.NodeChecks("node1")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
|
@ -174,7 +174,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
|
|||
c2.CreateIndex != 4 || c2.ModifyIndex != 4 {
|
||||
t.Fatalf("bad check returned: %#v", c2)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
|
||||
|
@ -192,17 +192,17 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
|
|||
restore.Commit()
|
||||
|
||||
// Retrieve the node and verify its contents.
|
||||
verifyNode := func(created, modified uint64) {
|
||||
verifyNode := func() {
|
||||
_, out, err := s.GetNode("node1")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if out.Node != "node1" || out.Address != "1.2.3.4" ||
|
||||
out.CreateIndex != created || out.ModifyIndex != modified {
|
||||
out.CreateIndex != 1 || out.ModifyIndex != 1 {
|
||||
t.Fatalf("bad node returned: %#v", out)
|
||||
}
|
||||
}
|
||||
verifyNode(1, 1)
|
||||
verifyNode()
|
||||
|
||||
// Add in a service definition.
|
||||
req.Service = &structs.NodeService{
|
||||
|
@ -218,12 +218,12 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
|
|||
restore.Commit()
|
||||
|
||||
// Verify that the service got registered.
|
||||
verifyService := func(created, modified uint64) {
|
||||
verifyService := func() {
|
||||
idx, out, err := s.NodeServices("node1")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != modified {
|
||||
if idx != 2 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if len(out.Services) != 1 {
|
||||
|
@ -232,12 +232,10 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
|
|||
s := out.Services["redis1"]
|
||||
if s.ID != "redis1" || s.Service != "redis" ||
|
||||
s.Address != "1.1.1.1" || s.Port != 8080 ||
|
||||
s.CreateIndex != created || s.ModifyIndex != modified {
|
||||
s.CreateIndex != 2 || s.ModifyIndex != 2 {
|
||||
t.Fatalf("bad service returned: %#v", s)
|
||||
}
|
||||
}
|
||||
verifyNode(1, 2)
|
||||
verifyService(2, 2)
|
||||
|
||||
// Add in a top-level check.
|
||||
req.Check = &structs.HealthCheck{
|
||||
|
@ -252,12 +250,12 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
|
|||
restore.Commit()
|
||||
|
||||
// Verify that the check got registered.
|
||||
verifyCheck := func(created, modified uint64) {
|
||||
verifyCheck := func() {
|
||||
idx, out, err := s.NodeChecks("node1")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != modified {
|
||||
if idx != 3 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if len(out) != 1 {
|
||||
|
@ -265,13 +263,13 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
|
|||
}
|
||||
c := out[0]
|
||||
if c.Node != "node1" || c.CheckID != "check1" || c.Name != "check" ||
|
||||
c.CreateIndex != created || c.ModifyIndex != modified {
|
||||
c.CreateIndex != 3 || c.ModifyIndex != 3 {
|
||||
t.Fatalf("bad check returned: %#v", c)
|
||||
}
|
||||
}
|
||||
verifyNode(1, 3)
|
||||
verifyService(2, 3)
|
||||
verifyCheck(3, 3)
|
||||
verifyNode()
|
||||
verifyService()
|
||||
verifyCheck()
|
||||
|
||||
// Add in another check via the slice.
|
||||
req.Checks = structs.HealthChecks{
|
||||
|
@ -288,8 +286,8 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
|
|||
restore.Commit()
|
||||
|
||||
// Verify that the additional check got registered.
|
||||
verifyNode(1, 4)
|
||||
verifyService(2, 4)
|
||||
verifyNode()
|
||||
verifyService()
|
||||
func() {
|
||||
idx, out, err := s.NodeChecks("node1")
|
||||
if err != nil {
|
||||
|
@ -318,6 +316,9 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
|
|||
func TestStateStore_EnsureRegistration_Watches(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// With the new diffing logic for the node and service structures, we
|
||||
// need to twiddle the request to get the expected watch to fire for
|
||||
// the restore cases below.
|
||||
req := &structs.RegisterRequest{
|
||||
Node: "node1",
|
||||
Address: "1.2.3.4",
|
||||
|
@ -337,6 +338,7 @@ func TestStateStore_EnsureRegistration_Watches(t *testing.T) {
|
|||
verifyWatch(t, s.getTableWatch("nodes"), func() {
|
||||
verifyNoWatch(t, s.getTableWatch("services"), func() {
|
||||
verifyNoWatch(t, s.getTableWatch("checks"), func() {
|
||||
req.Address = "1.2.3.5"
|
||||
restore := s.Restore()
|
||||
if err := restore.Registration(1, req); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
|
@ -345,16 +347,14 @@ func TestStateStore_EnsureRegistration_Watches(t *testing.T) {
|
|||
})
|
||||
})
|
||||
})
|
||||
|
||||
// With a service definition added it should fire nodes and
|
||||
// services.
|
||||
// With a service definition added it should fire just services.
|
||||
req.Service = &structs.NodeService{
|
||||
ID: "redis1",
|
||||
Service: "redis",
|
||||
Address: "1.1.1.1",
|
||||
Port: 8080,
|
||||
}
|
||||
verifyWatch(t, s.getTableWatch("nodes"), func() {
|
||||
verifyNoWatch(t, s.getTableWatch("nodes"), func() {
|
||||
verifyWatch(t, s.getTableWatch("services"), func() {
|
||||
verifyNoWatch(t, s.getTableWatch("checks"), func() {
|
||||
if err := s.EnsureRegistration(2, req); err != nil {
|
||||
|
@ -363,9 +363,10 @@ func TestStateStore_EnsureRegistration_Watches(t *testing.T) {
|
|||
})
|
||||
})
|
||||
})
|
||||
verifyWatch(t, s.getTableWatch("nodes"), func() {
|
||||
verifyNoWatch(t, s.getTableWatch("nodes"), func() {
|
||||
verifyWatch(t, s.getTableWatch("services"), func() {
|
||||
verifyNoWatch(t, s.getTableWatch("checks"), func() {
|
||||
req.Service.Address = "1.1.1.2"
|
||||
restore := s.Restore()
|
||||
if err := restore.Registration(2, req); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
|
@ -375,14 +376,14 @@ func TestStateStore_EnsureRegistration_Watches(t *testing.T) {
|
|||
})
|
||||
})
|
||||
|
||||
// Now with a check it should hit all three.
|
||||
// Adding a check should just affect checks.
|
||||
req.Check = &structs.HealthCheck{
|
||||
Node: "node1",
|
||||
CheckID: "check1",
|
||||
Name: "check",
|
||||
}
|
||||
verifyWatch(t, s.getTableWatch("nodes"), func() {
|
||||
verifyWatch(t, s.getTableWatch("services"), func() {
|
||||
verifyNoWatch(t, s.getTableWatch("nodes"), func() {
|
||||
verifyNoWatch(t, s.getTableWatch("services"), func() {
|
||||
verifyWatch(t, s.getTableWatch("checks"), func() {
|
||||
if err := s.EnsureRegistration(3, req); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
|
@ -390,8 +391,8 @@ func TestStateStore_EnsureRegistration_Watches(t *testing.T) {
|
|||
})
|
||||
})
|
||||
})
|
||||
verifyWatch(t, s.getTableWatch("nodes"), func() {
|
||||
verifyWatch(t, s.getTableWatch("services"), func() {
|
||||
verifyNoWatch(t, s.getTableWatch("nodes"), func() {
|
||||
verifyNoWatch(t, s.getTableWatch("services"), func() {
|
||||
verifyWatch(t, s.getTableWatch("checks"), func() {
|
||||
restore := s.Restore()
|
||||
if err := restore.Registration(3, req); err != nil {
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
crand "crypto/rand"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/types"
|
||||
|
|
Loading…
Reference in New Issue