open-consul/agent/consul/config_replication_test.go

333 lines
9.6 KiB
Go
Raw Permalink Normal View History

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package consul
import (
"context"
"fmt"
"github.com/oklog/ulid/v2"
"github.com/stretchr/testify/assert"
"os"
"testing"
2021-11-08 16:58:49 +00:00
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
)
func TestReplication_ConfigEntries(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
client := rpcClient(t, s1)
defer client.Close()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1"
c.ConfigReplicationRate = 100
c.ConfigReplicationBurst = 100
c.ConfigReplicationApplyLimit = 1000000
})
testrpc.WaitForLeader(t, s2.RPC, "dc2")
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Try to join.
joinWAN(t, s2, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s1.RPC, "dc2")
// Create some new configuration entries
var entries []structs.ConfigEntry
for i := 0; i < 50; i++ {
arg := structs.ConfigEntryRequest{
Datacenter: "dc1",
Op: structs.ConfigEntryUpsert,
Entry: &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: fmt.Sprintf("svc-%d", i),
Protocol: "tcp",
},
}
Centralized Config CLI (#5731) * Add HTTP endpoints for config entry management * Finish implementing decoding in the HTTP Config entry apply endpoint * Add CAS operation to the config entry apply endpoint Also use this for the bootstrapping and move the config entry decoding function into the structs package. * First pass at the API client for the config entries * Fixup some of the ConfigEntry APIs Return a singular response object instead of a list for the ConfigEntry.Get RPC. This gets plumbed through the HTTP API as well. Dont return QueryMeta in the JSON response for the config entry listing HTTP API. Instead just return a list of config entries. * Minor API client fixes * Attempt at some ConfigEntry api client tests These don’t currently work due to weak typing in JSON * Get some of the api client tests passing * Implement reflectwalk magic to correct JSON encoding a ProxyConfigEntry Also added a test for the HTTP endpoint that exposes the problem. However, since the test doesn’t actually do the JSON encode/decode its still failing. * Move MapWalk magic into a binary marshaller instead of JSON. * Add a MapWalk test * Get rid of unused func * Get rid of unused imports * Fixup some tests now that the decoding from msgpack coerces things into json compat types * Stub out most of the central config cli Fully implement the config read command. * Basic config delete command implementation * Implement config write command * Implement config list subcommand Not entirely sure about the output here. Its basically the read output indented with a line specifying the kind/name of each type which is also duplicated in the indented output. * Update command usage * Update some help usage formatting * Add the connect enable helper cli command * Update list command output * Rename the config entry API client methods. * Use renamed apis * Implement config write tests Stub the others with the noTabs tests. * Change list output format Now just simply output 1 line per named config * Add config read tests * Add invalid args write test. * Add config delete tests * Add config list tests * Add connect enable tests * Update some CLI commands to use CAS ops This also modifies the HTTP API for a write op to return a boolean indicating whether the value was written or not. * Fix up the HTTP API CAS tests as I realized they weren’t testing what they should. * Update config entry rpc tests to properly test CAS * Fix up a few more tests * Fix some tests that using ConfigEntries.Apply * Update config_write_test.go * Get rid of unused import
2019-04-30 23:27:16 +00:00
out := false
require.NoError(t, s1.RPC(context.Background(), "ConfigEntry.Apply", &arg, &out))
entries = append(entries, arg.Entry)
}
arg := structs.ConfigEntryRequest{
Datacenter: "dc1",
Op: structs.ConfigEntryUpsert,
Entry: &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: "global",
Config: map[string]interface{}{
"foo": "bar",
"bar": 1,
},
},
}
Centralized Config CLI (#5731) * Add HTTP endpoints for config entry management * Finish implementing decoding in the HTTP Config entry apply endpoint * Add CAS operation to the config entry apply endpoint Also use this for the bootstrapping and move the config entry decoding function into the structs package. * First pass at the API client for the config entries * Fixup some of the ConfigEntry APIs Return a singular response object instead of a list for the ConfigEntry.Get RPC. This gets plumbed through the HTTP API as well. Dont return QueryMeta in the JSON response for the config entry listing HTTP API. Instead just return a list of config entries. * Minor API client fixes * Attempt at some ConfigEntry api client tests These don’t currently work due to weak typing in JSON * Get some of the api client tests passing * Implement reflectwalk magic to correct JSON encoding a ProxyConfigEntry Also added a test for the HTTP endpoint that exposes the problem. However, since the test doesn’t actually do the JSON encode/decode its still failing. * Move MapWalk magic into a binary marshaller instead of JSON. * Add a MapWalk test * Get rid of unused func * Get rid of unused imports * Fixup some tests now that the decoding from msgpack coerces things into json compat types * Stub out most of the central config cli Fully implement the config read command. * Basic config delete command implementation * Implement config write command * Implement config list subcommand Not entirely sure about the output here. Its basically the read output indented with a line specifying the kind/name of each type which is also duplicated in the indented output. * Update command usage * Update some help usage formatting * Add the connect enable helper cli command * Update list command output * Rename the config entry API client methods. * Use renamed apis * Implement config write tests Stub the others with the noTabs tests. * Change list output format Now just simply output 1 line per named config * Add config read tests * Add invalid args write test. * Add config delete tests * Add config list tests * Add connect enable tests * Update some CLI commands to use CAS ops This also modifies the HTTP API for a write op to return a boolean indicating whether the value was written or not. * Fix up the HTTP API CAS tests as I realized they weren’t testing what they should. * Update config entry rpc tests to properly test CAS * Fix up a few more tests * Fix some tests that using ConfigEntries.Apply * Update config_write_test.go * Get rid of unused import
2019-04-30 23:27:16 +00:00
out := false
require.NoError(t, s1.RPC(context.Background(), "ConfigEntry.Apply", &arg, &out))
entries = append(entries, arg.Entry)
checkSame := func(t *retry.R) error {
_, remote, err := s1.fsm.State().ConfigEntries(nil, structs.ReplicationEnterpriseMeta())
require.NoError(t, err)
_, local, err := s2.fsm.State().ConfigEntries(nil, structs.ReplicationEnterpriseMeta())
require.NoError(t, err)
require.Len(t, local, len(remote))
for i, entry := range remote {
require.Equal(t, entry.GetKind(), local[i].GetKind())
require.Equal(t, entry.GetName(), local[i].GetName())
// more validations
switch entry.GetKind() {
case structs.ServiceDefaults:
localSvc, ok := local[i].(*structs.ServiceConfigEntry)
require.True(t, ok)
remoteSvc, ok := entry.(*structs.ServiceConfigEntry)
require.True(t, ok)
require.Equal(t, remoteSvc.Protocol, localSvc.Protocol)
case structs.ProxyDefaults:
localProxy, ok := local[i].(*structs.ProxyConfigEntry)
require.True(t, ok)
remoteProxy, ok := entry.(*structs.ProxyConfigEntry)
require.True(t, ok)
require.Equal(t, remoteProxy.Config, localProxy.Config)
}
}
return nil
}
// Wait for the replica to converge.
retry.Run(t, func(r *retry.R) {
checkSame(r)
})
// Update those policies
for i := 0; i < 50; i++ {
arg := structs.ConfigEntryRequest{
Datacenter: "dc1",
Op: structs.ConfigEntryUpsert,
Entry: &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: fmt.Sprintf("svc-%d", i),
Protocol: "udp",
},
}
Centralized Config CLI (#5731) * Add HTTP endpoints for config entry management * Finish implementing decoding in the HTTP Config entry apply endpoint * Add CAS operation to the config entry apply endpoint Also use this for the bootstrapping and move the config entry decoding function into the structs package. * First pass at the API client for the config entries * Fixup some of the ConfigEntry APIs Return a singular response object instead of a list for the ConfigEntry.Get RPC. This gets plumbed through the HTTP API as well. Dont return QueryMeta in the JSON response for the config entry listing HTTP API. Instead just return a list of config entries. * Minor API client fixes * Attempt at some ConfigEntry api client tests These don’t currently work due to weak typing in JSON * Get some of the api client tests passing * Implement reflectwalk magic to correct JSON encoding a ProxyConfigEntry Also added a test for the HTTP endpoint that exposes the problem. However, since the test doesn’t actually do the JSON encode/decode its still failing. * Move MapWalk magic into a binary marshaller instead of JSON. * Add a MapWalk test * Get rid of unused func * Get rid of unused imports * Fixup some tests now that the decoding from msgpack coerces things into json compat types * Stub out most of the central config cli Fully implement the config read command. * Basic config delete command implementation * Implement config write command * Implement config list subcommand Not entirely sure about the output here. Its basically the read output indented with a line specifying the kind/name of each type which is also duplicated in the indented output. * Update command usage * Update some help usage formatting * Add the connect enable helper cli command * Update list command output * Rename the config entry API client methods. * Use renamed apis * Implement config write tests Stub the others with the noTabs tests. * Change list output format Now just simply output 1 line per named config * Add config read tests * Add invalid args write test. * Add config delete tests * Add config list tests * Add connect enable tests * Update some CLI commands to use CAS ops This also modifies the HTTP API for a write op to return a boolean indicating whether the value was written or not. * Fix up the HTTP API CAS tests as I realized they weren’t testing what they should. * Update config entry rpc tests to properly test CAS * Fix up a few more tests * Fix some tests that using ConfigEntries.Apply * Update config_write_test.go * Get rid of unused import
2019-04-30 23:27:16 +00:00
out := false
require.NoError(t, s1.RPC(context.Background(), "ConfigEntry.Apply", &arg, &out))
}
arg = structs.ConfigEntryRequest{
Datacenter: "dc1",
Op: structs.ConfigEntryUpsert,
Entry: &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: "global",
Config: map[string]interface{}{
"foo": "baz",
"baz": 2,
},
},
}
require.NoError(t, s1.RPC(context.Background(), "ConfigEntry.Apply", &arg, &out))
// Wait for the replica to converge.
retry.Run(t, func(r *retry.R) {
checkSame(r)
})
for _, entry := range entries {
arg := structs.ConfigEntryRequest{
Datacenter: "dc1",
Op: structs.ConfigEntryDelete,
Entry: entry,
}
var out structs.ConfigEntryDeleteResponse
require.NoError(t, s1.RPC(context.Background(), "ConfigEntry.Delete", &arg, &out))
}
// Wait for the replica to converge.
retry.Run(t, func(r *retry.R) {
checkSame(r)
})
}
server: partly fix config entry replication issue that prevents replication in some circumstances (#12307) There are some cross-config-entry relationships that are enforced during "graph validation" at persistence time that are required to be maintained. This means that config entries may form a digraph at times. Config entry replication procedes in a particular sorted order by kind and name. Occasionally there are some fixups to these digraphs that end up replicating in the wrong order and replicating the leaves (ingress-gateway) before the roots (service-defaults) leading to replication halting due to a graph validation error related to things like mismatched service protocol requirements. This PR changes replication to give each computed change (upsert/delete) a fair shot at being applied before deciding to terminate that round of replication in error. In the case where we've simply tried to do the operations in the wrong order at least ONE of the outstanding requests will complete in the right order, leading the subsequent round to have fewer operations to do, with a smaller likelihood of graph validation errors. This does not address all scenarios, but for scenarios where the edits are being applied in the wrong order this should avoid replication halting. Fixes #9319 The scenario that is NOT ADDRESSED by this PR is as follows: 1. create: service-defaults: name=new-web, protocol=http 2. create: service-defaults: name=old-web, protocol=http 3. create: service-resolver: name=old-web, redirect-to=new-web 4. delete: service-resolver: name=old-web 5. update: service-defaults: name=old-web, protocol=grpc 6. update: service-defaults: name=new-web, protocol=grpc 7. create: service-resolver: name=old-web, redirect-to=new-web If you shutdown dc2 just before (4) and turn it back on after (7) replication is impossible as there is no single edit you can make to make forward progress.
2022-02-23 23:27:48 +00:00
func TestReplication_ConfigEntries_GraphValidationErrorDuringReplication(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
_, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
})
testrpc.WaitForLeader(t, s1.RPC, "dc1")
_, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1"
c.ConfigReplicationRate = 100
c.ConfigReplicationBurst = 100
c.ConfigReplicationApplyLimit = 1000000
})
testrpc.WaitForLeader(t, s2.RPC, "dc2")
// Create two entries that will replicate in the wrong order and not work.
entries := []structs.ConfigEntry{
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
Protocol: "http",
},
&structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: "foo",
Listeners: []structs.IngressListener{
{
Port: 9191,
Protocol: "http",
Services: []structs.IngressService{
{
Name: "foo",
},
},
},
},
},
}
for _, entry := range entries {
arg := structs.ConfigEntryRequest{
Datacenter: "dc1",
Op: structs.ConfigEntryUpsert,
Entry: entry,
}
out := false
require.NoError(t, s1.RPC(context.Background(), "ConfigEntry.Apply", &arg, &out))
server: partly fix config entry replication issue that prevents replication in some circumstances (#12307) There are some cross-config-entry relationships that are enforced during "graph validation" at persistence time that are required to be maintained. This means that config entries may form a digraph at times. Config entry replication procedes in a particular sorted order by kind and name. Occasionally there are some fixups to these digraphs that end up replicating in the wrong order and replicating the leaves (ingress-gateway) before the roots (service-defaults) leading to replication halting due to a graph validation error related to things like mismatched service protocol requirements. This PR changes replication to give each computed change (upsert/delete) a fair shot at being applied before deciding to terminate that round of replication in error. In the case where we've simply tried to do the operations in the wrong order at least ONE of the outstanding requests will complete in the right order, leading the subsequent round to have fewer operations to do, with a smaller likelihood of graph validation errors. This does not address all scenarios, but for scenarios where the edits are being applied in the wrong order this should avoid replication halting. Fixes #9319 The scenario that is NOT ADDRESSED by this PR is as follows: 1. create: service-defaults: name=new-web, protocol=http 2. create: service-defaults: name=old-web, protocol=http 3. create: service-resolver: name=old-web, redirect-to=new-web 4. delete: service-resolver: name=old-web 5. update: service-defaults: name=old-web, protocol=grpc 6. update: service-defaults: name=new-web, protocol=grpc 7. create: service-resolver: name=old-web, redirect-to=new-web If you shutdown dc2 just before (4) and turn it back on after (7) replication is impossible as there is no single edit you can make to make forward progress.
2022-02-23 23:27:48 +00:00
}
// Try to join which should kick off replication.
joinWAN(t, s2, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s1.RPC, "dc2")
checkSame := func(t require.TestingT) error {
_, remote, err := s1.fsm.State().ConfigEntries(nil, structs.ReplicationEnterpriseMeta())
require.NoError(t, err)
_, local, err := s2.fsm.State().ConfigEntries(nil, structs.ReplicationEnterpriseMeta())
require.NoError(t, err)
require.Len(t, local, len(remote))
for i, entry := range remote {
require.Equal(t, entry.GetKind(), local[i].GetKind())
require.Equal(t, entry.GetName(), local[i].GetName())
// more validations
switch entry.GetKind() {
case structs.IngressGateway:
localGw, ok := local[i].(*structs.IngressGatewayConfigEntry)
require.True(t, ok)
remoteGw, ok := entry.(*structs.IngressGatewayConfigEntry)
require.True(t, ok)
require.Len(t, remoteGw.Listeners, 1)
require.Len(t, localGw.Listeners, 1)
require.Equal(t, remoteGw.Listeners[0].Protocol, localGw.Listeners[0].Protocol)
case structs.ServiceDefaults:
localSvc, ok := local[i].(*structs.ServiceConfigEntry)
require.True(t, ok)
remoteSvc, ok := entry.(*structs.ServiceConfigEntry)
require.True(t, ok)
require.Equal(t, remoteSvc.Protocol, localSvc.Protocol)
}
}
return nil
}
// Wait for the replica to converge.
retry.Run(t, func(r *retry.R) {
checkSame(r)
})
}
func createConfigEntries(num int, indexStart int) []structs.ConfigEntry {
entries := make([]structs.ConfigEntry, num)
for i := range entries {
entries[i] = &structs.ServiceConfigEntry{Name: ulid.Make().String(), RaftIndex: structs.RaftIndex{ModifyIndex: uint64(i + indexStart)}}
}
return entries
}
func mutateIDs(e []structs.ConfigEntry, indexStart int) []structs.ConfigEntry {
entries := make([]structs.ConfigEntry, len(e))
for i := range entries {
entries[i] = &structs.ServiceConfigEntry{Name: e[i].GetName(), RaftIndex: structs.RaftIndex{ModifyIndex: uint64(i + indexStart)}}
}
return entries
}
func Test_diffConfigEntries(t *testing.T) {
type args struct {
local []structs.ConfigEntry
remote []structs.ConfigEntry
lastRemoteIndex uint64
normalize bool
}
entries1 := createConfigEntries(10, 10)
entries2 := createConfigEntries(10, 20)
entries3 := append(entries1, entries2...)
entries4 := mutateIDs(entries1, 20)
entries5 := mutateIDs(entries1, 0)
tests := []struct {
name string
args args
updated []structs.ConfigEntry
deleted []structs.ConfigEntry
}{
{"empty", args{local: make([]structs.ConfigEntry, 0), remote: make([]structs.ConfigEntry, 0), lastRemoteIndex: 0, normalize: true}, nil, nil},
{"same", args{local: entries1, remote: entries1, lastRemoteIndex: 0, normalize: true}, nil, nil},
{"new remote", args{local: nil, remote: entries1, lastRemoteIndex: 0, normalize: true}, entries1, nil},
{"extra remote", args{local: entries1, remote: entries3, lastRemoteIndex: 0, normalize: true}, entries2, nil},
{"extra local", args{local: entries3, remote: entries1, lastRemoteIndex: 0, normalize: true}, nil, entries2},
{"same, same size, different raft ID", args{local: entries1, remote: entries4, lastRemoteIndex: 0, normalize: true}, nil, nil},
{"when hash is empty, avoid hash compare", args{local: entries5, remote: entries4, lastRemoteIndex: 0, normalize: false}, entries4, nil},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.args.normalize {
for _, l := range tt.args.local {
require.NoError(t, l.Normalize())
}
for _, r := range tt.args.remote {
require.NoError(t, r.Normalize())
}
}
deletions, updates := diffConfigEntries(tt.args.local, tt.args.remote, tt.args.lastRemoteIndex)
assert.Equalf(t, tt.updated, updates, "updated diffConfigEntries(%v, %v, %v)", tt.args.local, tt.args.remote, tt.args.lastRemoteIndex)
assert.Equalf(t, tt.deleted, deletions, "deleted diffConfigEntries(%v, %v, %v)", tt.args.local, tt.args.remote, tt.args.lastRemoteIndex)
})
}
}