diff --git a/.changelog/12307.txt b/.changelog/12307.txt new file mode 100644 index 000000000..67286211d --- /dev/null +++ b/.changelog/12307.txt @@ -0,0 +1,3 @@ +```release-note:bug +server: partly fix config entry replication issue that prevents replication in some circumstances +``` diff --git a/agent/consul/config_replication.go b/agent/consul/config_replication.go index 243cd8bb3..ac43efa96 100644 --- a/agent/consul/config_replication.go +++ b/agent/consul/config_replication.go @@ -8,6 +8,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/consul/agent/structs" ) @@ -91,6 +92,7 @@ func (s *Server) reconcileLocalConfig(ctx context.Context, configs []structs.Con ticker := time.NewTicker(time.Second / time.Duration(s.config.ConfigReplicationApplyLimit)) defer ticker.Stop() + var merr error for i, entry := range configs { // Exported services only apply to the primary datacenter. if entry.GetKind() == structs.ExportedServices { @@ -104,7 +106,7 @@ func (s *Server) reconcileLocalConfig(ctx context.Context, configs []structs.Con _, err := s.raftApply(structs.ConfigEntryRequestType, &req) if err != nil { - return false, fmt.Errorf("Failed to apply config %s: %v", op, err) + merr = multierror.Append(merr, fmt.Errorf("Failed to apply config entry %s: %w", op, err)) } if i < len(configs)-1 { @@ -117,7 +119,7 @@ func (s *Server) reconcileLocalConfig(ctx context.Context, configs []structs.Con } } - return false, nil + return false, merr } func (s *Server) fetchConfigEntries(lastRemoteIndex uint64) (*structs.IndexedGenericConfigEntries, error) { @@ -204,6 +206,7 @@ func (s *Server) replicateConfig(ctx context.Context, lastRemoteIndex uint64, lo "updates", len(updates), ) + var merr error if len(deletions) > 0 { logger.Debug("Deleting local config entries", "deletions", len(deletions), @@ -214,9 +217,10 @@ func (s *Server) replicateConfig(ctx context.Context, lastRemoteIndex uint64, lo return 0, true, nil } if err != nil { - return 0, false, fmt.Errorf("failed to delete local config entries: %v", err) + merr = multierror.Append(merr, err) + } else { + logger.Debug("Config Entry replication - finished deletions") } - logger.Debug("Config Entry replication - finished deletions") } if len(updates) > 0 { @@ -228,9 +232,14 @@ func (s *Server) replicateConfig(ctx context.Context, lastRemoteIndex uint64, lo return 0, true, nil } if err != nil { - return 0, false, fmt.Errorf("failed to update local config entries: %v", err) + merr = multierror.Append(merr, err) + } else { + logger.Debug("Config Entry replication - finished updates") } - logger.Debug("Config Entry replication - finished updates") + } + + if merr != nil { + return 0, false, merr } // Return the index we got back from the remote side, since we've synced diff --git a/agent/consul/config_replication_test.go b/agent/consul/config_replication_test.go index 5231d43a4..24cf0d4e4 100644 --- a/agent/consul/config_replication_test.go +++ b/agent/consul/config_replication_test.go @@ -247,3 +247,100 @@ func TestReplication_ConfigEntries(t *testing.T) { checkSame(r) }) } + +func TestReplication_ConfigEntries_GraphValidationErrorDuringReplication(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + _, s1 := testServerWithConfig(t, func(c *Config) { + c.PrimaryDatacenter = "dc1" + }) + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + _, s2 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc1" + c.ConfigReplicationRate = 100 + c.ConfigReplicationBurst = 100 + c.ConfigReplicationApplyLimit = 1000000 + }) + testrpc.WaitForLeader(t, s2.RPC, "dc2") + + // Create two entries that will replicate in the wrong order and not work. + entries := []structs.ConfigEntry{ + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + Protocol: "http", + }, + &structs.IngressGatewayConfigEntry{ + Kind: structs.IngressGateway, + Name: "foo", + Listeners: []structs.IngressListener{ + { + Port: 9191, + Protocol: "http", + Services: []structs.IngressService{ + { + Name: "foo", + }, + }, + }, + }, + }, + } + for _, entry := range entries { + arg := structs.ConfigEntryRequest{ + Datacenter: "dc1", + Op: structs.ConfigEntryUpsert, + Entry: entry, + } + + out := false + require.NoError(t, s1.RPC("ConfigEntry.Apply", &arg, &out)) + } + + // Try to join which should kick off replication. + joinWAN(t, s2, s1) + testrpc.WaitForLeader(t, s1.RPC, "dc1") + testrpc.WaitForLeader(t, s1.RPC, "dc2") + + checkSame := func(t require.TestingT) error { + _, remote, err := s1.fsm.State().ConfigEntries(nil, structs.ReplicationEnterpriseMeta()) + require.NoError(t, err) + _, local, err := s2.fsm.State().ConfigEntries(nil, structs.ReplicationEnterpriseMeta()) + require.NoError(t, err) + + require.Len(t, local, len(remote)) + for i, entry := range remote { + require.Equal(t, entry.GetKind(), local[i].GetKind()) + require.Equal(t, entry.GetName(), local[i].GetName()) + + // more validations + switch entry.GetKind() { + case structs.IngressGateway: + localGw, ok := local[i].(*structs.IngressGatewayConfigEntry) + require.True(t, ok) + remoteGw, ok := entry.(*structs.IngressGatewayConfigEntry) + require.True(t, ok) + require.Len(t, remoteGw.Listeners, 1) + require.Len(t, localGw.Listeners, 1) + require.Equal(t, remoteGw.Listeners[0].Protocol, localGw.Listeners[0].Protocol) + case structs.ServiceDefaults: + localSvc, ok := local[i].(*structs.ServiceConfigEntry) + require.True(t, ok) + remoteSvc, ok := entry.(*structs.ServiceConfigEntry) + require.True(t, ok) + require.Equal(t, remoteSvc.Protocol, localSvc.Protocol) + } + } + return nil + } + + // Wait for the replica to converge. + retry.Run(t, func(r *retry.R) { + checkSame(r) + }) +}