server: conditionally avoid writing a config entry to raft if it was already the same (#12321)

This will both save on unnecessary raft operations as well as
unnecessarily incrementing the raft modify index of config entries
subject to no-op updates.
This commit is contained in:
R.B. Boyer 2022-02-14 14:39:12 -06:00 committed by GitHub
parent ef8cc33949
commit b216d52b66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 237 additions and 93 deletions

3
.changelog/12321.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
server: conditionally avoid writing a config entry to raft if it was already the same
```

View File

@ -2,6 +2,7 @@ package consul
import ( import (
"fmt" "fmt"
"reflect"
"time" "time"
metrics "github.com/armon/go-metrics" metrics "github.com/armon/go-metrics"
@ -93,6 +94,14 @@ func (c *ConfigEntry) Apply(args *structs.ConfigEntryRequest, reply *bool) error
if args.Op != structs.ConfigEntryUpsert && args.Op != structs.ConfigEntryUpsertCAS { if args.Op != structs.ConfigEntryUpsert && args.Op != structs.ConfigEntryUpsertCAS {
args.Op = structs.ConfigEntryUpsert args.Op = structs.ConfigEntryUpsert
} }
if skip, err := c.shouldSkipOperation(args); err != nil {
return err
} else if skip {
*reply = true
return nil
}
resp, err := c.srv.raftApply(structs.ConfigEntryRequestType, args) resp, err := c.srv.raftApply(structs.ConfigEntryRequestType, args)
if err != nil { if err != nil {
return err return err
@ -104,6 +113,62 @@ func (c *ConfigEntry) Apply(args *structs.ConfigEntryRequest, reply *bool) error
return nil return nil
} }
// shouldSkipOperation returns true if the result of the operation has
// already happened and is safe to skip.
//
// It is ok if this incorrectly detects something as changed when it
// in fact has not, the important thing is that it doesn't do
// the reverse and incorrectly detect a change as a no-op.
func (c *ConfigEntry) shouldSkipOperation(args *structs.ConfigEntryRequest) (bool, error) {
state := c.srv.fsm.State()
_, currentEntry, err := state.ConfigEntry(nil, args.Entry.GetKind(), args.Entry.GetName(), args.Entry.GetEnterpriseMeta())
if err != nil {
return false, fmt.Errorf("error reading current config entry value: %w", err)
}
switch args.Op {
case structs.ConfigEntryUpsert, structs.ConfigEntryUpsertCAS:
return c.shouldSkipUpsertOperation(currentEntry, args.Entry)
case structs.ConfigEntryDelete, structs.ConfigEntryDeleteCAS:
return (currentEntry == nil), nil
default:
return false, fmt.Errorf("invalid config entry operation type: %v", args.Op)
}
}
func (c *ConfigEntry) shouldSkipUpsertOperation(currentEntry, updatedEntry structs.ConfigEntry) (bool, error) {
if currentEntry == nil {
return false, nil
}
if currentEntry.GetKind() != updatedEntry.GetKind() ||
currentEntry.GetName() != updatedEntry.GetName() ||
!currentEntry.GetEnterpriseMeta().IsSame(updatedEntry.GetEnterpriseMeta()) {
return false, nil
}
// The only reason a fully Normalized and Validated config entry may
// legitimately differ from the persisted one is due to the embedded
// RaftIndex.
//
// So, to intercept more no-op upserts we temporarily set the new config
// entry's raft index field to that of the existing data for the purposes
// of comparison, and then restore it.
var (
currentRaftIndex = currentEntry.GetRaftIndex()
userProvidedRaftIndex = updatedEntry.GetRaftIndex()
currentRaftIndexCopy = *currentRaftIndex
userProvidedRaftIndexCopy = *userProvidedRaftIndex
)
*userProvidedRaftIndex = currentRaftIndexCopy // change
same := reflect.DeepEqual(currentEntry, updatedEntry) // compare
*userProvidedRaftIndex = userProvidedRaftIndexCopy // restore
return same, nil
}
// Get returns a single config entry by Kind/Name. // Get returns a single config entry by Kind/Name.
func (c *ConfigEntry) Get(args *structs.ConfigEntryQuery, reply *structs.ConfigEntryResponse) error { func (c *ConfigEntry) Get(args *structs.ConfigEntryQuery, reply *structs.ConfigEntryResponse) error {
if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil { if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
@ -309,6 +374,13 @@ func (c *ConfigEntry) Delete(args *structs.ConfigEntryRequest, reply *structs.Co
args.Op = structs.ConfigEntryDelete args.Op = structs.ConfigEntryDelete
} }
if skip, err := c.shouldSkipOperation(args); err != nil {
return err
} else if skip {
reply.Deleted = true
return nil
}
rsp, err := c.srv.raftApply(structs.ConfigEntryRequestType, args) rsp, err := c.srv.raftApply(structs.ConfigEntryRequestType, args)
if err != nil { if err != nil {
return err return err

View File

@ -45,68 +45,115 @@ func TestConfigEntry_Apply(t *testing.T) {
// wait for cross-dc queries to work // wait for cross-dc queries to work
testrpc.WaitForLeader(t, s2.RPC, "dc1") testrpc.WaitForLeader(t, s2.RPC, "dc1")
updated := &structs.ServiceConfigEntry{ runStep(t, "send the apply request to dc2 - it should get forwarded to dc1", func(t *testing.T) {
Name: "foo", updated := &structs.ServiceConfigEntry{
} Name: "foo",
// originally target this as going to dc2 }
args := structs.ConfigEntryRequest{ args := structs.ConfigEntryRequest{
Datacenter: "dc2", Datacenter: "dc2",
Entry: updated, Entry: updated,
} }
out := false var out bool
require.NoError(t, msgpackrpc.CallWithCodec(codec2, "ConfigEntry.Apply", &args, &out)) require.NoError(t, msgpackrpc.CallWithCodec(codec2, "ConfigEntry.Apply", &args, &out))
require.True(t, out) require.True(t, out)
// the previous RPC should not return until the primary has been updated but will return
// before the secondary has the data.
state := s1.fsm.State()
_, entry, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo", nil)
require.NoError(t, err)
serviceConf, ok := entry.(*structs.ServiceConfigEntry)
require.True(t, ok)
require.Equal(t, "foo", serviceConf.Name)
require.Equal(t, structs.ServiceDefaults, serviceConf.Kind)
retry.Run(t, func(r *retry.R) {
// wait for replication to happen
state := s2.fsm.State()
_, entry, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo", nil)
require.NoError(r, err)
require.NotNil(r, entry)
// this test is not testing that the config entries that are replicated are correct as thats done elsewhere.
}) })
updated = &structs.ServiceConfigEntry{ var originalModifyIndex uint64
Name: "foo", runStep(t, "verify the entry was updated in the primary and secondary", func(t *testing.T) {
MeshGateway: structs.MeshGatewayConfig{ // the previous RPC should not return until the primary has been updated but will return
Mode: structs.MeshGatewayModeLocal, // before the secondary has the data.
}, _, entry, err := s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil)
} require.NoError(t, err)
args = structs.ConfigEntryRequest{ serviceConf, ok := entry.(*structs.ServiceConfigEntry)
Datacenter: "dc1", require.True(t, ok)
Op: structs.ConfigEntryUpsertCAS, require.Equal(t, "foo", serviceConf.Name)
Entry: updated, require.Equal(t, structs.ServiceDefaults, serviceConf.Kind)
}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out)) retry.Run(t, func(r *retry.R) {
require.False(t, out) // wait for replication to happen
_, entry, err := s2.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil)
require.NoError(r, err)
require.NotNil(r, entry)
// this test is not testing that the config entries that are replicated are correct as thats done elsewhere.
})
originalModifyIndex = serviceConf.ModifyIndex
})
args.Entry.GetRaftIndex().ModifyIndex = serviceConf.ModifyIndex runStep(t, "update the entry again in the primary", func(t *testing.T) {
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out)) updated := &structs.ServiceConfigEntry{
require.True(t, out) Name: "foo",
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeLocal,
},
}
state = s1.fsm.State() args := structs.ConfigEntryRequest{
_, entry, err = state.ConfigEntry(nil, structs.ServiceDefaults, "foo", nil) Datacenter: "dc1",
require.NoError(t, err) Op: structs.ConfigEntryUpsertCAS,
Entry: updated,
}
serviceConf, ok = entry.(*structs.ServiceConfigEntry) runStep(t, "with the wrong CAS", func(t *testing.T) {
require.True(t, ok) var out bool
require.Equal(t, structs.ServiceDefaults, serviceConf.Kind) require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out))
require.Equal(t, "foo", serviceConf.Name) require.False(t, out)
require.Equal(t, "", serviceConf.Protocol) })
require.Equal(t, structs.ServiceDefaults, serviceConf.Kind) runStep(t, "with the correct CAS", func(t *testing.T) {
var out bool
args.Entry.GetRaftIndex().ModifyIndex = originalModifyIndex
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out))
require.True(t, out)
})
})
runStep(t, "verify the entry was updated in the state store", func(t *testing.T) {
_, entry, err := s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil)
require.NoError(t, err)
serviceConf, ok := entry.(*structs.ServiceConfigEntry)
require.True(t, ok)
require.Equal(t, structs.ServiceDefaults, serviceConf.Kind)
require.Equal(t, "foo", serviceConf.Name)
require.Equal(t, "", serviceConf.Protocol)
require.Equal(t, structs.ServiceDefaults, serviceConf.Kind)
})
runStep(t, "verify no-op updates do not advance the raft indexes", func(t *testing.T) {
var modifyIndex uint64
for i := 0; i < 3; i++ {
runStep(t, fmt.Sprintf("iteration %d", i), func(t *testing.T) {
args := structs.ConfigEntryRequest{
Datacenter: "dc1",
Op: structs.ConfigEntryUpsert,
Entry: &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "noop",
Protocol: "grpc",
},
}
var out bool
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out))
require.True(t, out)
getIndex, entry, err := s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "noop", nil)
require.NoError(t, err)
require.NotNil(t, entry)
listIndex, entries, err := s1.fsm.State().ConfigEntries(nil, nil)
require.NoError(t, err)
require.Len(t, entries, 2)
if i == 0 {
modifyIndex = entry.GetRaftIndex().ModifyIndex
} else {
require.Equal(t, modifyIndex, entry.GetRaftIndex().ModifyIndex)
require.Equal(t, modifyIndex, getIndex)
require.Equal(t, modifyIndex, listIndex)
}
})
}
})
} }
func TestConfigEntry_ProxyDefaultsMeshGateway(t *testing.T) { func TestConfigEntry_ProxyDefaultsMeshGateway(t *testing.T) {
@ -623,48 +670,70 @@ func TestConfigEntry_Delete(t *testing.T) {
// wait for cross-dc queries to work // wait for cross-dc queries to work
testrpc.WaitForLeader(t, s2.RPC, "dc1") testrpc.WaitForLeader(t, s2.RPC, "dc1")
// Create a dummy service in the state store to look up. runStep(t, "create a dummy service in the state store to look up", func(t *testing.T) {
entry := &structs.ServiceConfigEntry{ entry := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults, Kind: structs.ServiceDefaults,
Name: "foo", Name: "foo",
} }
state := s1.fsm.State() require.NoError(t, s1.fsm.State().EnsureConfigEntry(1, entry))
require.NoError(t, state.EnsureConfigEntry(1, entry))
// Verify it's there.
_, existing, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo", nil)
require.NoError(t, err)
serviceConf, ok := existing.(*structs.ServiceConfigEntry)
require.True(t, ok)
require.Equal(t, "foo", serviceConf.Name)
require.Equal(t, structs.ServiceDefaults, serviceConf.Kind)
retry.Run(t, func(r *retry.R) {
// wait for it to be replicated into the secondary dc
_, existing, err := s2.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil)
require.NoError(r, err)
require.NotNil(r, existing)
}) })
// send the delete request to dc2 - it should get forwarded to dc1. runStep(t, "verify it exists in the primary and is replicated to the secondary", func(t *testing.T) {
args := structs.ConfigEntryRequest{ // Verify it's there.
Datacenter: "dc2", _, existing, err := s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil)
} require.NoError(t, err)
args.Entry = entry
var out struct{}
require.NoError(t, msgpackrpc.CallWithCodec(codec2, "ConfigEntry.Delete", &args, &out))
// Verify the entry was deleted. serviceConf, ok := existing.(*structs.ServiceConfigEntry)
_, existing, err = s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil) require.True(t, ok)
require.NoError(t, err) require.Equal(t, "foo", serviceConf.Name)
require.Nil(t, existing) require.Equal(t, structs.ServiceDefaults, serviceConf.Kind)
// verify it gets deleted from the secondary too retry.Run(t, func(r *retry.R) {
retry.Run(t, func(r *retry.R) { // wait for it to be replicated into the secondary dc
_, existing, err := s2.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil) _, existing, err := s2.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil)
require.NoError(r, err) require.NoError(r, err)
require.Nil(r, existing) require.NotNil(r, existing)
})
})
runStep(t, "send the delete request to dc2 - it should get forwarded to dc1", func(t *testing.T) {
args := structs.ConfigEntryRequest{
Datacenter: "dc2",
Entry: &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
},
}
var out structs.ConfigEntryDeleteResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec2, "ConfigEntry.Delete", &args, &out))
require.True(t, out.Deleted)
})
runStep(t, "verify the entry was deleted in the primary and secondary", func(t *testing.T) {
// Verify the entry was deleted.
_, existing, err := s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil)
require.NoError(t, err)
require.Nil(t, existing)
// verify it gets deleted from the secondary too
retry.Run(t, func(r *retry.R) {
_, existing, err := s2.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil)
require.NoError(r, err)
require.Nil(r, existing)
})
})
runStep(t, "delete in dc1 again - should be fine", func(t *testing.T) {
args := structs.ConfigEntryRequest{
Datacenter: "dc1",
Entry: &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
},
}
var out structs.ConfigEntryDeleteResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Delete", &args, &out))
require.True(t, out.Deleted)
}) })
} }