From b216d52b6626fab9c5c6e3eddf92604fcb33f6f9 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Mon, 14 Feb 2022 14:39:12 -0600 Subject: [PATCH] server: conditionally avoid writing a config entry to raft if it was already the same (#12321) This will both save on unnecessary raft operations as well as unnecessarily incrementing the raft modify index of config entries subject to no-op updates. --- .changelog/12321.txt | 3 + agent/consul/config_endpoint.go | 72 ++++++++ agent/consul/config_endpoint_test.go | 255 +++++++++++++++++---------- 3 files changed, 237 insertions(+), 93 deletions(-) create mode 100644 .changelog/12321.txt diff --git a/.changelog/12321.txt b/.changelog/12321.txt new file mode 100644 index 000000000..b8181ecfe --- /dev/null +++ b/.changelog/12321.txt @@ -0,0 +1,3 @@ +```release-note:improvement +server: conditionally avoid writing a config entry to raft if it was already the same +``` diff --git a/agent/consul/config_endpoint.go b/agent/consul/config_endpoint.go index 0fb5a6ef8..44b211962 100644 --- a/agent/consul/config_endpoint.go +++ b/agent/consul/config_endpoint.go @@ -2,6 +2,7 @@ package consul import ( "fmt" + "reflect" "time" metrics "github.com/armon/go-metrics" @@ -93,6 +94,14 @@ func (c *ConfigEntry) Apply(args *structs.ConfigEntryRequest, reply *bool) error if args.Op != structs.ConfigEntryUpsert && args.Op != structs.ConfigEntryUpsertCAS { args.Op = structs.ConfigEntryUpsert } + + if skip, err := c.shouldSkipOperation(args); err != nil { + return err + } else if skip { + *reply = true + return nil + } + resp, err := c.srv.raftApply(structs.ConfigEntryRequestType, args) if err != nil { return err @@ -104,6 +113,62 @@ func (c *ConfigEntry) Apply(args *structs.ConfigEntryRequest, reply *bool) error return nil } +// shouldSkipOperation returns true if the result of the operation has +// already happened and is safe to skip. +// +// It is ok if this incorrectly detects something as changed when it +// in fact has not, the important thing is that it doesn't do +// the reverse and incorrectly detect a change as a no-op. +func (c *ConfigEntry) shouldSkipOperation(args *structs.ConfigEntryRequest) (bool, error) { + state := c.srv.fsm.State() + _, currentEntry, err := state.ConfigEntry(nil, args.Entry.GetKind(), args.Entry.GetName(), args.Entry.GetEnterpriseMeta()) + if err != nil { + return false, fmt.Errorf("error reading current config entry value: %w", err) + } + + switch args.Op { + case structs.ConfigEntryUpsert, structs.ConfigEntryUpsertCAS: + return c.shouldSkipUpsertOperation(currentEntry, args.Entry) + case structs.ConfigEntryDelete, structs.ConfigEntryDeleteCAS: + return (currentEntry == nil), nil + default: + return false, fmt.Errorf("invalid config entry operation type: %v", args.Op) + } +} + +func (c *ConfigEntry) shouldSkipUpsertOperation(currentEntry, updatedEntry structs.ConfigEntry) (bool, error) { + if currentEntry == nil { + return false, nil + } + + if currentEntry.GetKind() != updatedEntry.GetKind() || + currentEntry.GetName() != updatedEntry.GetName() || + !currentEntry.GetEnterpriseMeta().IsSame(updatedEntry.GetEnterpriseMeta()) { + return false, nil + } + + // The only reason a fully Normalized and Validated config entry may + // legitimately differ from the persisted one is due to the embedded + // RaftIndex. + // + // So, to intercept more no-op upserts we temporarily set the new config + // entry's raft index field to that of the existing data for the purposes + // of comparison, and then restore it. + var ( + currentRaftIndex = currentEntry.GetRaftIndex() + userProvidedRaftIndex = updatedEntry.GetRaftIndex() + + currentRaftIndexCopy = *currentRaftIndex + userProvidedRaftIndexCopy = *userProvidedRaftIndex + ) + + *userProvidedRaftIndex = currentRaftIndexCopy // change + same := reflect.DeepEqual(currentEntry, updatedEntry) // compare + *userProvidedRaftIndex = userProvidedRaftIndexCopy // restore + + return same, nil +} + // Get returns a single config entry by Kind/Name. func (c *ConfigEntry) Get(args *structs.ConfigEntryQuery, reply *structs.ConfigEntryResponse) error { if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil { @@ -309,6 +374,13 @@ func (c *ConfigEntry) Delete(args *structs.ConfigEntryRequest, reply *structs.Co args.Op = structs.ConfigEntryDelete } + if skip, err := c.shouldSkipOperation(args); err != nil { + return err + } else if skip { + reply.Deleted = true + return nil + } + rsp, err := c.srv.raftApply(structs.ConfigEntryRequestType, args) if err != nil { return err diff --git a/agent/consul/config_endpoint_test.go b/agent/consul/config_endpoint_test.go index 2dbe5dba3..fce29efe3 100644 --- a/agent/consul/config_endpoint_test.go +++ b/agent/consul/config_endpoint_test.go @@ -45,68 +45,115 @@ func TestConfigEntry_Apply(t *testing.T) { // wait for cross-dc queries to work testrpc.WaitForLeader(t, s2.RPC, "dc1") - updated := &structs.ServiceConfigEntry{ - Name: "foo", - } - // originally target this as going to dc2 - args := structs.ConfigEntryRequest{ - Datacenter: "dc2", - Entry: updated, - } - out := false - require.NoError(t, msgpackrpc.CallWithCodec(codec2, "ConfigEntry.Apply", &args, &out)) - require.True(t, out) - - // the previous RPC should not return until the primary has been updated but will return - // before the secondary has the data. - state := s1.fsm.State() - _, entry, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo", nil) - require.NoError(t, err) - - serviceConf, ok := entry.(*structs.ServiceConfigEntry) - require.True(t, ok) - require.Equal(t, "foo", serviceConf.Name) - require.Equal(t, structs.ServiceDefaults, serviceConf.Kind) - - retry.Run(t, func(r *retry.R) { - // wait for replication to happen - state := s2.fsm.State() - _, entry, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo", nil) - require.NoError(r, err) - require.NotNil(r, entry) - // this test is not testing that the config entries that are replicated are correct as thats done elsewhere. + runStep(t, "send the apply request to dc2 - it should get forwarded to dc1", func(t *testing.T) { + updated := &structs.ServiceConfigEntry{ + Name: "foo", + } + args := structs.ConfigEntryRequest{ + Datacenter: "dc2", + Entry: updated, + } + var out bool + require.NoError(t, msgpackrpc.CallWithCodec(codec2, "ConfigEntry.Apply", &args, &out)) + require.True(t, out) }) - updated = &structs.ServiceConfigEntry{ - Name: "foo", - MeshGateway: structs.MeshGatewayConfig{ - Mode: structs.MeshGatewayModeLocal, - }, - } + var originalModifyIndex uint64 + runStep(t, "verify the entry was updated in the primary and secondary", func(t *testing.T) { + // the previous RPC should not return until the primary has been updated but will return + // before the secondary has the data. + _, entry, err := s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil) + require.NoError(t, err) - args = structs.ConfigEntryRequest{ - Datacenter: "dc1", - Op: structs.ConfigEntryUpsertCAS, - Entry: updated, - } + serviceConf, ok := entry.(*structs.ServiceConfigEntry) + require.True(t, ok) + require.Equal(t, "foo", serviceConf.Name) + require.Equal(t, structs.ServiceDefaults, serviceConf.Kind) - require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out)) - require.False(t, out) + retry.Run(t, func(r *retry.R) { + // wait for replication to happen + _, entry, err := s2.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil) + require.NoError(r, err) + require.NotNil(r, entry) + // this test is not testing that the config entries that are replicated are correct as thats done elsewhere. + }) + originalModifyIndex = serviceConf.ModifyIndex + }) - args.Entry.GetRaftIndex().ModifyIndex = serviceConf.ModifyIndex - require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out)) - require.True(t, out) + runStep(t, "update the entry again in the primary", func(t *testing.T) { + updated := &structs.ServiceConfigEntry{ + Name: "foo", + MeshGateway: structs.MeshGatewayConfig{ + Mode: structs.MeshGatewayModeLocal, + }, + } - state = s1.fsm.State() - _, entry, err = state.ConfigEntry(nil, structs.ServiceDefaults, "foo", nil) - require.NoError(t, err) + args := structs.ConfigEntryRequest{ + Datacenter: "dc1", + Op: structs.ConfigEntryUpsertCAS, + Entry: updated, + } - serviceConf, ok = entry.(*structs.ServiceConfigEntry) - require.True(t, ok) - require.Equal(t, structs.ServiceDefaults, serviceConf.Kind) - require.Equal(t, "foo", serviceConf.Name) - require.Equal(t, "", serviceConf.Protocol) - require.Equal(t, structs.ServiceDefaults, serviceConf.Kind) + runStep(t, "with the wrong CAS", func(t *testing.T) { + var out bool + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out)) + require.False(t, out) + }) + runStep(t, "with the correct CAS", func(t *testing.T) { + var out bool + args.Entry.GetRaftIndex().ModifyIndex = originalModifyIndex + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out)) + require.True(t, out) + }) + }) + + runStep(t, "verify the entry was updated in the state store", func(t *testing.T) { + _, entry, err := s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil) + require.NoError(t, err) + + serviceConf, ok := entry.(*structs.ServiceConfigEntry) + require.True(t, ok) + require.Equal(t, structs.ServiceDefaults, serviceConf.Kind) + require.Equal(t, "foo", serviceConf.Name) + require.Equal(t, "", serviceConf.Protocol) + require.Equal(t, structs.ServiceDefaults, serviceConf.Kind) + }) + + runStep(t, "verify no-op updates do not advance the raft indexes", func(t *testing.T) { + var modifyIndex uint64 + for i := 0; i < 3; i++ { + runStep(t, fmt.Sprintf("iteration %d", i), func(t *testing.T) { + args := structs.ConfigEntryRequest{ + Datacenter: "dc1", + Op: structs.ConfigEntryUpsert, + Entry: &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "noop", + Protocol: "grpc", + }, + } + var out bool + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out)) + require.True(t, out) + + getIndex, entry, err := s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "noop", nil) + require.NoError(t, err) + require.NotNil(t, entry) + + listIndex, entries, err := s1.fsm.State().ConfigEntries(nil, nil) + require.NoError(t, err) + require.Len(t, entries, 2) + + if i == 0 { + modifyIndex = entry.GetRaftIndex().ModifyIndex + } else { + require.Equal(t, modifyIndex, entry.GetRaftIndex().ModifyIndex) + require.Equal(t, modifyIndex, getIndex) + require.Equal(t, modifyIndex, listIndex) + } + }) + } + }) } func TestConfigEntry_ProxyDefaultsMeshGateway(t *testing.T) { @@ -623,48 +670,70 @@ func TestConfigEntry_Delete(t *testing.T) { // wait for cross-dc queries to work testrpc.WaitForLeader(t, s2.RPC, "dc1") - // Create a dummy service in the state store to look up. - entry := &structs.ServiceConfigEntry{ - Kind: structs.ServiceDefaults, - Name: "foo", - } - state := s1.fsm.State() - require.NoError(t, state.EnsureConfigEntry(1, entry)) - - // Verify it's there. - _, existing, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo", nil) - require.NoError(t, err) - - serviceConf, ok := existing.(*structs.ServiceConfigEntry) - require.True(t, ok) - require.Equal(t, "foo", serviceConf.Name) - require.Equal(t, structs.ServiceDefaults, serviceConf.Kind) - - retry.Run(t, func(r *retry.R) { - // wait for it to be replicated into the secondary dc - _, existing, err := s2.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil) - require.NoError(r, err) - require.NotNil(r, existing) + runStep(t, "create a dummy service in the state store to look up", func(t *testing.T) { + entry := &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + } + require.NoError(t, s1.fsm.State().EnsureConfigEntry(1, entry)) }) - // send the delete request to dc2 - it should get forwarded to dc1. - args := structs.ConfigEntryRequest{ - Datacenter: "dc2", - } - args.Entry = entry - var out struct{} - require.NoError(t, msgpackrpc.CallWithCodec(codec2, "ConfigEntry.Delete", &args, &out)) + runStep(t, "verify it exists in the primary and is replicated to the secondary", func(t *testing.T) { + // Verify it's there. + _, existing, err := s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil) + require.NoError(t, err) - // Verify the entry was deleted. - _, existing, err = s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil) - require.NoError(t, err) - require.Nil(t, existing) + serviceConf, ok := existing.(*structs.ServiceConfigEntry) + require.True(t, ok) + require.Equal(t, "foo", serviceConf.Name) + require.Equal(t, structs.ServiceDefaults, serviceConf.Kind) - // verify it gets deleted from the secondary too - retry.Run(t, func(r *retry.R) { - _, existing, err := s2.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil) - require.NoError(r, err) - require.Nil(r, existing) + retry.Run(t, func(r *retry.R) { + // wait for it to be replicated into the secondary dc + _, existing, err := s2.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil) + require.NoError(r, err) + require.NotNil(r, existing) + }) + }) + + runStep(t, "send the delete request to dc2 - it should get forwarded to dc1", func(t *testing.T) { + args := structs.ConfigEntryRequest{ + Datacenter: "dc2", + Entry: &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + }, + } + var out structs.ConfigEntryDeleteResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec2, "ConfigEntry.Delete", &args, &out)) + require.True(t, out.Deleted) + }) + + runStep(t, "verify the entry was deleted in the primary and secondary", func(t *testing.T) { + // Verify the entry was deleted. + _, existing, err := s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil) + require.NoError(t, err) + require.Nil(t, existing) + + // verify it gets deleted from the secondary too + retry.Run(t, func(r *retry.R) { + _, existing, err := s2.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil) + require.NoError(r, err) + require.Nil(r, existing) + }) + }) + + runStep(t, "delete in dc1 again - should be fine", func(t *testing.T) { + args := structs.ConfigEntryRequest{ + Datacenter: "dc1", + Entry: &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + }, + } + var out structs.ConfigEntryDeleteResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Delete", &args, &out)) + require.True(t, out.Deleted) }) }