server: partly fix config entry replication issue that prevents replication in some circumstances (#12307)

There are some cross-config-entry relationships that are enforced during
"graph validation" at persistence time that are required to be
maintained. This means that config entries may form a digraph at times.

Config entry replication procedes in a particular sorted order by kind
and name.

Occasionally there are some fixups to these digraphs that end up
replicating in the wrong order and replicating the leaves
(ingress-gateway) before the roots (service-defaults) leading to
replication halting due to a graph validation error related to things
like mismatched service protocol requirements.

This PR changes replication to give each computed change (upsert/delete)
a fair shot at being applied before deciding to terminate that round of
replication in error. In the case where we've simply tried to do the
operations in the wrong order at least ONE of the outstanding requests
will complete in the right order, leading the subsequent round to have
fewer operations to do, with a smaller likelihood of graph validation
errors.

This does not address all scenarios, but for scenarios where the edits
are being applied in the wrong order this should avoid replication
halting.

Fixes #9319

The scenario that is NOT ADDRESSED by this PR is as follows:

1. create: service-defaults: name=new-web, protocol=http
2. create: service-defaults: name=old-web, protocol=http
3. create: service-resolver: name=old-web, redirect-to=new-web
4. delete: service-resolver: name=old-web
5. update: service-defaults: name=old-web, protocol=grpc
6. update: service-defaults: name=new-web, protocol=grpc
7. create: service-resolver: name=old-web, redirect-to=new-web

If you shutdown dc2 just before (4) and turn it back on after (7)
replication is impossible as there is no single edit you can make to
make forward progress.
This commit is contained in:
R.B. Boyer 2022-02-23 17:27:48 -06:00 committed by GitHub
parent 4b528edbe6
commit d860384731
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 115 additions and 6 deletions

3
.changelog/12307.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
server: partly fix config entry replication issue that prevents replication in some circumstances
```

View File

@ -8,6 +8,7 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
) )
@ -91,6 +92,7 @@ func (s *Server) reconcileLocalConfig(ctx context.Context, configs []structs.Con
ticker := time.NewTicker(time.Second / time.Duration(s.config.ConfigReplicationApplyLimit)) ticker := time.NewTicker(time.Second / time.Duration(s.config.ConfigReplicationApplyLimit))
defer ticker.Stop() defer ticker.Stop()
var merr error
for i, entry := range configs { for i, entry := range configs {
// Exported services only apply to the primary datacenter. // Exported services only apply to the primary datacenter.
if entry.GetKind() == structs.ExportedServices { if entry.GetKind() == structs.ExportedServices {
@ -104,7 +106,7 @@ func (s *Server) reconcileLocalConfig(ctx context.Context, configs []structs.Con
_, err := s.raftApply(structs.ConfigEntryRequestType, &req) _, err := s.raftApply(structs.ConfigEntryRequestType, &req)
if err != nil { if err != nil {
return false, fmt.Errorf("Failed to apply config %s: %v", op, err) merr = multierror.Append(merr, fmt.Errorf("Failed to apply config entry %s: %w", op, err))
} }
if i < len(configs)-1 { if i < len(configs)-1 {
@ -117,7 +119,7 @@ func (s *Server) reconcileLocalConfig(ctx context.Context, configs []structs.Con
} }
} }
return false, nil return false, merr
} }
func (s *Server) fetchConfigEntries(lastRemoteIndex uint64) (*structs.IndexedGenericConfigEntries, error) { func (s *Server) fetchConfigEntries(lastRemoteIndex uint64) (*structs.IndexedGenericConfigEntries, error) {
@ -204,6 +206,7 @@ func (s *Server) replicateConfig(ctx context.Context, lastRemoteIndex uint64, lo
"updates", len(updates), "updates", len(updates),
) )
var merr error
if len(deletions) > 0 { if len(deletions) > 0 {
logger.Debug("Deleting local config entries", logger.Debug("Deleting local config entries",
"deletions", len(deletions), "deletions", len(deletions),
@ -214,9 +217,10 @@ func (s *Server) replicateConfig(ctx context.Context, lastRemoteIndex uint64, lo
return 0, true, nil return 0, true, nil
} }
if err != nil { if err != nil {
return 0, false, fmt.Errorf("failed to delete local config entries: %v", err) merr = multierror.Append(merr, err)
} else {
logger.Debug("Config Entry replication - finished deletions")
} }
logger.Debug("Config Entry replication - finished deletions")
} }
if len(updates) > 0 { if len(updates) > 0 {
@ -228,9 +232,14 @@ func (s *Server) replicateConfig(ctx context.Context, lastRemoteIndex uint64, lo
return 0, true, nil return 0, true, nil
} }
if err != nil { if err != nil {
return 0, false, fmt.Errorf("failed to update local config entries: %v", err) merr = multierror.Append(merr, err)
} else {
logger.Debug("Config Entry replication - finished updates")
} }
logger.Debug("Config Entry replication - finished updates") }
if merr != nil {
return 0, false, merr
} }
// Return the index we got back from the remote side, since we've synced // Return the index we got back from the remote side, since we've synced

View File

@ -247,3 +247,100 @@ func TestReplication_ConfigEntries(t *testing.T) {
checkSame(r) checkSame(r)
}) })
} }
func TestReplication_ConfigEntries_GraphValidationErrorDuringReplication(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
_, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
})
testrpc.WaitForLeader(t, s1.RPC, "dc1")
_, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1"
c.ConfigReplicationRate = 100
c.ConfigReplicationBurst = 100
c.ConfigReplicationApplyLimit = 1000000
})
testrpc.WaitForLeader(t, s2.RPC, "dc2")
// Create two entries that will replicate in the wrong order and not work.
entries := []structs.ConfigEntry{
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
Protocol: "http",
},
&structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: "foo",
Listeners: []structs.IngressListener{
{
Port: 9191,
Protocol: "http",
Services: []structs.IngressService{
{
Name: "foo",
},
},
},
},
},
}
for _, entry := range entries {
arg := structs.ConfigEntryRequest{
Datacenter: "dc1",
Op: structs.ConfigEntryUpsert,
Entry: entry,
}
out := false
require.NoError(t, s1.RPC("ConfigEntry.Apply", &arg, &out))
}
// Try to join which should kick off replication.
joinWAN(t, s2, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s1.RPC, "dc2")
checkSame := func(t require.TestingT) error {
_, remote, err := s1.fsm.State().ConfigEntries(nil, structs.ReplicationEnterpriseMeta())
require.NoError(t, err)
_, local, err := s2.fsm.State().ConfigEntries(nil, structs.ReplicationEnterpriseMeta())
require.NoError(t, err)
require.Len(t, local, len(remote))
for i, entry := range remote {
require.Equal(t, entry.GetKind(), local[i].GetKind())
require.Equal(t, entry.GetName(), local[i].GetName())
// more validations
switch entry.GetKind() {
case structs.IngressGateway:
localGw, ok := local[i].(*structs.IngressGatewayConfigEntry)
require.True(t, ok)
remoteGw, ok := entry.(*structs.IngressGatewayConfigEntry)
require.True(t, ok)
require.Len(t, remoteGw.Listeners, 1)
require.Len(t, localGw.Listeners, 1)
require.Equal(t, remoteGw.Listeners[0].Protocol, localGw.Listeners[0].Protocol)
case structs.ServiceDefaults:
localSvc, ok := local[i].(*structs.ServiceConfigEntry)
require.True(t, ok)
remoteSvc, ok := entry.(*structs.ServiceConfigEntry)
require.True(t, ok)
require.Equal(t, remoteSvc.Protocol, localSvc.Protocol)
}
}
return nil
}
// Wait for the replica to converge.
retry.Run(t, func(r *retry.R) {
checkSame(r)
})
}