diff --git a/agent/consul/fsm/snapshot_oss_test.go b/agent/consul/fsm/snapshot_oss_test.go index 558abf4be..29678d1d0 100644 --- a/agent/consul/fsm/snapshot_oss_test.go +++ b/agent/consul/fsm/snapshot_oss_test.go @@ -476,6 +476,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { // Peerings require.NoError(t, fsm.state.PeeringWrite(31, &pbpeering.Peering{ + ID: "1fabcd52-1d46-49b0-b1d8-71559aee47f5", Name: "baz", })) diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index 3e2f6c8ff..169ca833f 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -7,13 +7,13 @@ import ( "testing" "time" - "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/api" "github.com/stretchr/testify/require" "google.golang.org/grpc" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" @@ -62,6 +62,10 @@ func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) { _, found := s1.peeringService.StreamStatus(token.PeerID) require.False(t, found) + var ( + s2PeerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d" + ) + // Bring up s2 and store s1's token so that it attempts to dial. _, s2 := testServerWithConfig(t, func(c *Config) { c.NodeName = "s2.dc2" @@ -73,6 +77,7 @@ func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) { // Simulate a peering initiation event by writing a peering with data from a peering token. // Eventually the leader in dc2 should dial and connect to the leader in dc1. p := &pbpeering.Peering{ + ID: s2PeerID, Name: "my-peer-s1", PeerID: token.PeerID, PeerCAPems: token.CA, @@ -92,6 +97,7 @@ func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) { // Delete the peering to trigger the termination sequence. deleted := &pbpeering.Peering{ + ID: s2PeerID, Name: "my-peer-s1", DeletedAt: structs.TimeToProto(time.Now()), } @@ -151,6 +157,11 @@ func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) { var token structs.PeeringToken require.NoError(t, json.Unmarshal(tokenJSON, &token)) + var ( + s1PeerID = token.PeerID + s2PeerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d" + ) + // Bring up s2 and store s1's token so that it attempts to dial. _, s2 := testServerWithConfig(t, func(c *Config) { c.NodeName = "s2.dc2" @@ -162,6 +173,7 @@ func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) { // Simulate a peering initiation event by writing a peering with data from a peering token. // Eventually the leader in dc2 should dial and connect to the leader in dc1. p := &pbpeering.Peering{ + ID: s2PeerID, Name: "my-peer-s1", PeerID: token.PeerID, PeerCAPems: token.CA, @@ -181,6 +193,7 @@ func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) { // Delete the peering from the server peer to trigger the termination sequence. deleted := &pbpeering.Peering{ + ID: s1PeerID, Name: "my-peer-s2", DeletedAt: structs.TimeToProto(time.Now()), } @@ -216,6 +229,7 @@ func TestLeader_Peering_DeferredDeletion(t *testing.T) { testrpc.WaitForLeader(t, s1.RPC, "dc1") var ( + peerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d" peerName = "my-peer-s2" defaultMeta = acl.DefaultEnterpriseMeta() lastIdx = uint64(0) @@ -224,6 +238,7 @@ func TestLeader_Peering_DeferredDeletion(t *testing.T) { // Simulate a peering initiation event by writing a peering to the state store. lastIdx++ require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: peerID, Name: peerName, })) @@ -233,6 +248,7 @@ func TestLeader_Peering_DeferredDeletion(t *testing.T) { // Mark the peering for deletion to trigger the termination sequence. lastIdx++ require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: peerID, Name: peerName, DeletedAt: structs.TimeToProto(time.Now()), })) diff --git a/agent/consul/peering_backend.go b/agent/consul/peering_backend.go index 0ba3463c8..047569f11 100644 --- a/agent/consul/peering_backend.go +++ b/agent/consul/peering_backend.go @@ -143,6 +143,17 @@ type peeringApply struct { srv *Server } +func (a *peeringApply) CheckPeeringUUID(id string) (bool, error) { + state := a.srv.fsm.State() + if _, existing, err := state.PeeringReadByID(nil, id); err != nil { + return false, err + } else if existing != nil { + return false, nil + } + + return true, nil +} + func (a *peeringApply) PeeringWrite(req *pbpeering.PeeringWriteRequest) error { _, err := a.srv.raftApplyProtobuf(structs.PeeringWriteType, req) return err diff --git a/agent/consul/state/peering.go b/agent/consul/state/peering.go index 3d115707d..6515055b2 100644 --- a/agent/consul/state/peering.go +++ b/agent/consul/state/peering.go @@ -1,12 +1,12 @@ package state import ( + "errors" "fmt" "strings" "github.com/golang/protobuf/proto" "github.com/hashicorp/go-memdb" - "github.com/hashicorp/go-uuid" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" @@ -191,50 +191,47 @@ func (s *Store) peeringListTxn(ws memdb.WatchSet, tx ReadTxn, entMeta acl.Enterp return idx, result, nil } -func generatePeeringUUID(tx ReadTxn) (string, error) { - for { - uuid, err := uuid.GenerateUUID() - if err != nil { - return "", fmt.Errorf("failed to generate UUID: %w", err) - } - existing, err := peeringReadByIDTxn(tx, nil, uuid) - if err != nil { - return "", fmt.Errorf("failed to read peering: %w", err) - } - if existing == nil { - return uuid, nil - } - } -} - func (s *Store) PeeringWrite(idx uint64, p *pbpeering.Peering) error { tx := s.db.WriteTxn(idx) defer tx.Abort() - q := Query{ - Value: p.Name, - EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(p.Partition), + // Check that the ID and Name are set. + if p.ID == "" { + return errors.New("Missing Peering ID") } - existingRaw, err := tx.First(tablePeering, indexName, q) - if err != nil { - return fmt.Errorf("failed peering lookup: %w", err) + if p.Name == "" { + return errors.New("Missing Peering Name") } - existing, ok := existingRaw.(*pbpeering.Peering) - if existingRaw != nil && !ok { - return fmt.Errorf("invalid type %T", existingRaw) + // ensure the name is unique (cannot conflict with another peering with a different ID) + _, existing, err := peeringReadTxn(tx, nil, Query{ + Value: p.Name, + EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(p.Partition), + }) + if err != nil { + return err } if existing != nil { + if p.ID != existing.ID { + return fmt.Errorf("A peering already exists with the name %q and a different ID %q", p.Name, existing.ID) + } // Prevent modifications to Peering marked for deletion if !existing.IsActive() { return fmt.Errorf("cannot write to peering that is marked for deletion") } p.CreateIndex = existing.CreateIndex - p.ID = existing.ID - + p.ModifyIndex = idx } else { + idMatch, err := peeringReadByIDTxn(tx, nil, p.ID) + if err != nil { + return err + } + if idMatch != nil { + return fmt.Errorf("A peering already exists with the ID %q and a different name %q", p.Name, existing.ID) + } + if !p.IsActive() { return fmt.Errorf("cannot create a new peering marked for deletion") } @@ -242,13 +239,8 @@ func (s *Store) PeeringWrite(idx uint64, p *pbpeering.Peering) error { // TODO(peering): consider keeping PeeringState enum elsewhere? p.State = pbpeering.PeeringState_INITIAL p.CreateIndex = idx - - p.ID, err = generatePeeringUUID(tx) - if err != nil { - return fmt.Errorf("failed to generate peering id: %w", err) - } + p.ModifyIndex = idx } - p.ModifyIndex = idx if err := tx.Insert(tablePeering, p); err != nil { return fmt.Errorf("failed inserting peering: %w", err) diff --git a/agent/consul/state/peering_test.go b/agent/consul/state/peering_test.go index 4aba5c340..04389a8e9 100644 --- a/agent/consul/state/peering_test.go +++ b/agent/consul/state/peering_test.go @@ -1,13 +1,10 @@ package state import ( - "fmt" - "math/rand" "testing" "time" "github.com/hashicorp/go-memdb" - "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/require" "github.com/hashicorp/consul/acl" @@ -17,6 +14,12 @@ import ( "github.com/hashicorp/consul/sdk/testutil" ) +const ( + testFooPeerID = "9e650110-ac74-4c5a-a6a8-9348b2bed4e9" + testBarPeerID = "5ebcff30-5509-4858-8142-a8e580f1863f" + testBazPeerID = "432feb2f-5476-4ae2-b33c-e43640ca0e86" +) + func insertTestPeerings(t *testing.T, s *Store) { t.Helper() @@ -26,7 +29,7 @@ func insertTestPeerings(t *testing.T, s *Store) { err := tx.Insert(tablePeering, &pbpeering.Peering{ Name: "foo", Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), - ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9", + ID: testFooPeerID, State: pbpeering.PeeringState_INITIAL, CreateIndex: 1, ModifyIndex: 1, @@ -36,7 +39,7 @@ func insertTestPeerings(t *testing.T, s *Store) { err = tx.Insert(tablePeering, &pbpeering.Peering{ Name: "bar", Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), - ID: "5ebcff30-5509-4858-8142-a8e580f1863f", + ID: testBarPeerID, State: pbpeering.PeeringState_FAILING, CreateIndex: 2, ModifyIndex: 2, @@ -97,16 +100,16 @@ func TestStateStore_PeeringReadByID(t *testing.T) { run := func(t *testing.T, tc testcase) { _, peering, err := s.PeeringReadByID(nil, tc.id) require.NoError(t, err) - require.Equal(t, tc.expect, peering) + prototest.AssertDeepEqual(t, tc.expect, peering) } tcs := []testcase{ { name: "get foo", - id: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9", + id: testFooPeerID, expect: &pbpeering.Peering{ Name: "foo", Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), - ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9", + ID: testFooPeerID, State: pbpeering.PeeringState_INITIAL, CreateIndex: 1, ModifyIndex: 1, @@ -114,11 +117,11 @@ func TestStateStore_PeeringReadByID(t *testing.T) { }, { name: "get bar", - id: "5ebcff30-5509-4858-8142-a8e580f1863f", + id: testBarPeerID, expect: &pbpeering.Peering{ Name: "bar", Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), - ID: "5ebcff30-5509-4858-8142-a8e580f1863f", + ID: testBarPeerID, State: pbpeering.PeeringState_FAILING, CreateIndex: 2, ModifyIndex: 2, @@ -149,7 +152,7 @@ func TestStateStore_PeeringRead(t *testing.T) { run := func(t *testing.T, tc testcase) { _, peering, err := s.PeeringRead(nil, tc.query) require.NoError(t, err) - require.Equal(t, tc.expect, peering) + prototest.AssertDeepEqual(t, tc.expect, peering) } tcs := []testcase{ { @@ -160,7 +163,7 @@ func TestStateStore_PeeringRead(t *testing.T) { expect: &pbpeering.Peering{ Name: "foo", Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), - ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9", + ID: testFooPeerID, State: pbpeering.PeeringState_INITIAL, CreateIndex: 1, ModifyIndex: 1, @@ -189,6 +192,7 @@ func TestStore_Peering_Watch(t *testing.T) { // set up initial write err := s.PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: testFooPeerID, Name: "foo", }) require.NoError(t, err) @@ -210,6 +214,7 @@ func TestStore_Peering_Watch(t *testing.T) { lastIdx++ err := s.PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: testBarPeerID, Name: "bar", }) require.NoError(t, err) @@ -229,6 +234,7 @@ func TestStore_Peering_Watch(t *testing.T) { // unrelated write shouldn't fire watch lastIdx++ err := s.PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: testBarPeerID, Name: "bar", }) require.NoError(t, err) @@ -237,6 +243,7 @@ func TestStore_Peering_Watch(t *testing.T) { // foo write should fire watch lastIdx++ err = s.PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: testFooPeerID, Name: "foo", DeletedAt: structs.TimeToProto(time.Now()), }) @@ -261,6 +268,7 @@ func TestStore_Peering_Watch(t *testing.T) { // mark for deletion before actually deleting lastIdx++ err := s.PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: testBarPeerID, Name: "bar", DeletedAt: structs.TimeToProto(time.Now()), }) @@ -293,7 +301,7 @@ func TestStore_PeeringList(t *testing.T) { { Name: "foo", Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), - ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9", + ID: testFooPeerID, State: pbpeering.PeeringState_INITIAL, CreateIndex: 1, ModifyIndex: 1, @@ -301,7 +309,7 @@ func TestStore_PeeringList(t *testing.T) { { Name: "bar", Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), - ID: "5ebcff30-5509-4858-8142-a8e580f1863f", + ID: testBarPeerID, State: pbpeering.PeeringState_FAILING, CreateIndex: 2, ModifyIndex: 2, @@ -336,6 +344,7 @@ func TestStore_PeeringList_Watch(t *testing.T) { lastIdx++ // insert a peering err := s.PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: testFooPeerID, Name: "foo", Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), }) @@ -357,6 +366,7 @@ func TestStore_PeeringList_Watch(t *testing.T) { // update peering lastIdx++ require.NoError(t, s.PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: testFooPeerID, Name: "foo", DeletedAt: structs.TimeToProto(time.Now()), Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), @@ -422,6 +432,7 @@ func TestStore_PeeringWrite(t *testing.T) { { name: "create baz", input: &pbpeering.Peering{ + ID: testBazPeerID, Name: "baz", Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), }, @@ -429,6 +440,7 @@ func TestStore_PeeringWrite(t *testing.T) { { name: "update baz", input: &pbpeering.Peering{ + ID: testBazPeerID, Name: "baz", State: pbpeering.PeeringState_FAILING, Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), @@ -437,6 +449,7 @@ func TestStore_PeeringWrite(t *testing.T) { { name: "mark baz for deletion", input: &pbpeering.Peering{ + ID: testBazPeerID, Name: "baz", State: pbpeering.PeeringState_TERMINATED, DeletedAt: structs.TimeToProto(time.Now()), @@ -446,6 +459,7 @@ func TestStore_PeeringWrite(t *testing.T) { { name: "cannot update peering marked for deletion", input: &pbpeering.Peering{ + ID: testBazPeerID, Name: "baz", // Attempt to add metadata Meta: map[string]string{ @@ -458,6 +472,7 @@ func TestStore_PeeringWrite(t *testing.T) { { name: "cannot create peering marked for deletion", input: &pbpeering.Peering{ + ID: testFooPeerID, Name: "foo", DeletedAt: structs.TimeToProto(time.Now()), Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), @@ -472,54 +487,6 @@ func TestStore_PeeringWrite(t *testing.T) { } } -func TestStore_PeeringWrite_GenerateUUID(t *testing.T) { - rand.Seed(1) - - s := NewStateStore(nil) - - entMeta := structs.NodeEnterpriseMetaInDefaultPartition() - partition := entMeta.PartitionOrDefault() - - for i := 1; i < 11; i++ { - require.NoError(t, s.PeeringWrite(uint64(i), &pbpeering.Peering{ - Name: fmt.Sprintf("peering-%d", i), - Partition: partition, - })) - } - - idx, peerings, err := s.PeeringList(nil, *entMeta) - require.NoError(t, err) - require.Equal(t, uint64(10), idx) - require.Len(t, peerings, 10) - - // Ensure that all assigned UUIDs are unique. - uniq := make(map[string]struct{}) - for _, p := range peerings { - uniq[p.ID] = struct{}{} - } - require.Len(t, uniq, 10) - - // Ensure that the ID of an existing peering cannot be overwritten. - updated := &pbpeering.Peering{ - Name: peerings[0].Name, - Partition: peerings[0].Partition, - } - - // Attempt to overwrite ID. - updated.ID, err = uuid.GenerateUUID() - require.NoError(t, err) - require.NoError(t, s.PeeringWrite(11, updated)) - - q := Query{ - Value: updated.Name, - EnterpriseMeta: *entMeta, - } - idx, got, err := s.PeeringRead(nil, q) - require.NoError(t, err) - require.Equal(t, uint64(11), idx) - require.Equal(t, peerings[0].ID, got.ID) -} - func TestStore_PeeringDelete(t *testing.T) { s := NewStateStore(nil) insertTestPeerings(t, s) @@ -532,6 +499,7 @@ func TestStore_PeeringDelete(t *testing.T) { testutil.RunStep(t, "can delete after marking for deletion", func(t *testing.T) { require.NoError(t, s.PeeringWrite(11, &pbpeering.Peering{ + ID: testFooPeerID, Name: "foo", DeletedAt: structs.TimeToProto(time.Now()), })) @@ -550,7 +518,7 @@ func TestStore_PeeringTerminateByID(t *testing.T) { insertTestPeerings(t, s) // id corresponding to default/foo - id := "9e650110-ac74-4c5a-a6a8-9348b2bed4e9" + const id = testFooPeerID require.NoError(t, s.PeeringTerminateByID(10, id)) @@ -607,7 +575,7 @@ func TestStateStore_PeeringTrustBundleRead(t *testing.T) { run := func(t *testing.T, tc testcase) { _, ptb, err := s.PeeringTrustBundleRead(nil, tc.query) require.NoError(t, err) - require.Equal(t, tc.expect, ptb) + prototest.AssertDeepEqual(t, tc.expect, ptb) } entMeta := structs.NodeEnterpriseMetaInDefaultPartition() @@ -708,6 +676,7 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { lastIdx++ require.NoError(t, s.PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: testUUID(), Name: "my-peering", })) @@ -1000,6 +969,9 @@ func TestStateStore_PeeringsForService(t *testing.T) { var lastIdx uint64 // Create peerings for _, tp := range tc.peerings { + if tp.peering.ID == "" { + tp.peering.ID = testUUID() + } lastIdx++ require.NoError(t, s.PeeringWrite(lastIdx, tp.peering)) @@ -1009,6 +981,7 @@ func TestStateStore_PeeringsForService(t *testing.T) { lastIdx++ copied := pbpeering.Peering{ + ID: tp.peering.ID, Name: tp.peering.Name, DeletedAt: structs.TimeToProto(time.Now()), } @@ -1247,6 +1220,11 @@ func TestStore_TrustBundleListByService(t *testing.T) { var lastIdx uint64 ws := memdb.NewWatchSet() + var ( + peerID1 = testUUID() + peerID2 = testUUID() + ) + testutil.RunStep(t, "no results on initial setup", func(t *testing.T) { idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta) require.NoError(t, err) @@ -1279,6 +1257,7 @@ func TestStore_TrustBundleListByService(t *testing.T) { testutil.RunStep(t, "creating peering does not yield trust bundles", func(t *testing.T) { lastIdx++ require.NoError(t, store.PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: peerID1, Name: "peer1", })) @@ -1377,6 +1356,7 @@ func TestStore_TrustBundleListByService(t *testing.T) { testutil.RunStep(t, "bundles for other peers are ignored", func(t *testing.T) { lastIdx++ require.NoError(t, store.PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: peerID2, Name: "peer2", })) @@ -1431,6 +1411,7 @@ func TestStore_TrustBundleListByService(t *testing.T) { testutil.RunStep(t, "deleting the peering excludes its trust bundle", func(t *testing.T) { lastIdx++ require.NoError(t, store.PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: peerID1, Name: "peer1", DeletedAt: structs.TimeToProto(time.Now()), })) @@ -1470,7 +1451,7 @@ func TestStateStore_Peering_ListDeleted(t *testing.T) { err := tx.Insert(tablePeering, &pbpeering.Peering{ Name: "foo", Partition: acl.DefaultPartitionName, - ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9", + ID: testFooPeerID, DeletedAt: structs.TimeToProto(time.Now()), CreateIndex: 1, ModifyIndex: 1, @@ -1480,7 +1461,7 @@ func TestStateStore_Peering_ListDeleted(t *testing.T) { err = tx.Insert(tablePeering, &pbpeering.Peering{ Name: "bar", Partition: acl.DefaultPartitionName, - ID: "5ebcff30-5509-4858-8142-a8e580f1863f", + ID: testBarPeerID, CreateIndex: 2, ModifyIndex: 2, }) @@ -1489,7 +1470,7 @@ func TestStateStore_Peering_ListDeleted(t *testing.T) { err = tx.Insert(tablePeering, &pbpeering.Peering{ Name: "baz", Partition: acl.DefaultPartitionName, - ID: "432feb2f-5476-4ae2-b33c-e43640ca0e86", + ID: testBazPeerID, DeletedAt: structs.TimeToProto(time.Now()), CreateIndex: 3, ModifyIndex: 3, diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index 6d8de85d3..5638702aa 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -24,6 +24,7 @@ import ( "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/dns" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/proto/pbpeering" ) @@ -140,6 +141,7 @@ type Store interface { // Apply provides a write-only interface for persisting Peering data. type Apply interface { + CheckPeeringUUID(id string) (bool, error) PeeringWrite(req *pbpeering.PeeringWriteRequest) error PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDRequest) error PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error @@ -189,8 +191,16 @@ func (s *Service) GenerateToken( return nil, err } + canRetry := true +RETRY_ONCE: + id, err := s.getExistingOrCreateNewPeerID(req.PeerName, req.Partition) + if err != nil { + return nil, err + } + writeReq := pbpeering.PeeringWriteRequest{ Peering: &pbpeering.Peering{ + ID: id, Name: req.PeerName, // TODO(peering): Normalize from ACL token once this endpoint is guarded by ACLs. Partition: req.PartitionOrDefault(), @@ -198,6 +208,15 @@ func (s *Service) GenerateToken( }, } if err := s.Backend.Apply().PeeringWrite(&writeReq); err != nil { + // There's a possible race where two servers call Generate Token at the + // same time with the same peer name for the first time. They both + // generate an ID and try to insert and only one wins. This detects the + // collision and forces the loser to discard its generated ID and use + // the one from the other server. + if canRetry && strings.Contains(err.Error(), "A peering already exists with the name") { + canRetry = false + goto RETRY_ONCE + } return nil, fmt.Errorf("failed to write peering: %w", err) } @@ -270,6 +289,11 @@ func (s *Service) Establish( serverAddrs[i] = addr } + id, err := s.getExistingOrCreateNewPeerID(req.PeerName, req.Partition) + if err != nil { + return nil, err + } + // as soon as a peering is written with a list of ServerAddresses that is // non-empty, the leader routine will see the peering and attempt to // establish a connection with the remote peer. @@ -278,6 +302,7 @@ func (s *Service) Establish( // RemotePeerID(PeerID) but at this point the other peer does not. writeReq := &pbpeering.PeeringWriteRequest{ Peering: &pbpeering.Peering{ + ID: id, Name: req.PeerName, PeerCAPems: tok.CA, PeerServerAddresses: serverAddrs, @@ -368,6 +393,16 @@ func (s *Service) PeeringWrite(ctx context.Context, req *pbpeering.PeeringWriteR defer metrics.MeasureSince([]string{"peering", "write"}, time.Now()) // TODO(peering): ACL check request token + if req.Peering == nil { + return nil, fmt.Errorf("missing required peering body") + } + + id, err := s.getExistingOrCreateNewPeerID(req.Peering.Name, req.Peering.Partition) + if err != nil { + return nil, err + } + req.Peering.ID = id + // TODO(peering): handle blocking queries err = s.Backend.Apply().PeeringWrite(req) if err != nil { @@ -418,6 +453,7 @@ func (s *Service) PeeringDelete(ctx context.Context, req *pbpeering.PeeringDelet // We only need to include the name and partition for the peering to be identified. // All other data associated with the peering can be discarded because once marked // for deletion the peering is effectively gone. + ID: existing.ID, Name: req.Name, Partition: req.Partition, DeletedAt: structs.TimeToProto(time.Now().UTC()), @@ -837,6 +873,26 @@ func getTrustDomain(store Store, logger hclog.Logger) (string, error) { return connect.SpiffeIDSigningForCluster(cfg.ClusterID).Host(), nil } +func (s *Service) getExistingOrCreateNewPeerID(peerName, partition string) (string, error) { + q := state.Query{ + Value: strings.ToLower(peerName), + EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(partition), + } + _, peering, err := s.Backend.Store().PeeringRead(nil, q) + if err != nil { + return "", err + } + if peering != nil { + return peering.ID, nil + } + + id, err := lib.GenerateUUID(s.Backend.Apply().CheckPeeringUUID) + if err != nil { + return "", err + } + return id, nil +} + func (s *Service) StreamStatus(peer string) (resp StreamStatus, found bool) { return s.streams.streamStatus(peer) } diff --git a/agent/rpc/peering/service_test.go b/agent/rpc/peering/service_test.go index aba7973d0..af089f56c 100644 --- a/agent/rpc/peering/service_test.go +++ b/agent/rpc/peering/service_test.go @@ -30,6 +30,7 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/prototest" @@ -224,6 +225,7 @@ func TestPeeringService_Read(t *testing.T) { // insert peering directly to state store p := &pbpeering.Peering{ + ID: testUUID(t), Name: "foo", State: pbpeering.PeeringState_INITIAL, PeerCAPems: nil, @@ -279,6 +281,7 @@ func TestPeeringService_Delete(t *testing.T) { s := newTestServer(t, nil) p := &pbpeering.Peering{ + ID: testUUID(t), Name: "foo", State: pbpeering.PeeringState_INITIAL, PeerCAPems: nil, @@ -316,6 +319,7 @@ func TestPeeringService_List(t *testing.T) { // Note that the state store holds reference to the underlying // variables; do not modify them after writing. foo := &pbpeering.Peering{ + ID: testUUID(t), Name: "foo", State: pbpeering.PeeringState_INITIAL, PeerCAPems: nil, @@ -324,6 +328,7 @@ func TestPeeringService_List(t *testing.T) { } require.NoError(t, s.Server.FSM().State().PeeringWrite(10, foo)) bar := &pbpeering.Peering{ + ID: testUUID(t), Name: "bar", State: pbpeering.PeeringState_ACTIVE, PeerCAPems: nil, @@ -405,6 +410,7 @@ func TestPeeringService_TrustBundleListByService(t *testing.T) { lastIdx++ require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: testUUID(t), Name: "foo", State: pbpeering.PeeringState_INITIAL, PeerServerName: "test", @@ -413,6 +419,7 @@ func TestPeeringService_TrustBundleListByService(t *testing.T) { lastIdx++ require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: testUUID(t), Name: "bar", State: pbpeering.PeeringState_INITIAL, PeerServerName: "test-bar", @@ -513,6 +520,7 @@ func Test_StreamHandler_UpsertServices(t *testing.T) { ) require.NoError(t, s.Server.FSM().State().PeeringWrite(0, &pbpeering.Peering{ + ID: testUUID(t), Name: "my-peer", })) @@ -998,7 +1006,9 @@ func newDefaultDeps(t *testing.T, c *consul.Config) consul.Deps { } func setupTestPeering(t *testing.T, store *state.Store, name string, index uint64) string { + t.Helper() err := store.PeeringWrite(index, &pbpeering.Peering{ + ID: testUUID(t), Name: name, }) require.NoError(t, err) @@ -1009,3 +1019,9 @@ func setupTestPeering(t *testing.T, store *state.Store, name string, index uint6 return p.ID } + +func testUUID(t *testing.T) string { + v, err := lib.GenerateUUID(nil) + require.NoError(t, err) + return v +} diff --git a/agent/rpc/peering/stream_test.go b/agent/rpc/peering/stream_test.go index dc30fa686..9bc8eff4e 100644 --- a/agent/rpc/peering/stream_test.go +++ b/agent/rpc/peering/stream_test.go @@ -23,6 +23,7 @@ import ( "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/proto/pbservice" @@ -1030,6 +1031,10 @@ type testApplier struct { store *state.Store } +func (a *testApplier) CheckPeeringUUID(id string) (bool, error) { + panic("not implemented") +} + func (a *testApplier) PeeringWrite(req *pbpeering.PeeringWriteRequest) error { panic("not implemented") } @@ -1216,6 +1221,7 @@ func writeEstablishedPeering(t *testing.T, store *state.Store, idx uint64, peerN require.NoError(t, err) peering := pbpeering.Peering{ + ID: testUUID(t), Name: peerName, PeerID: remotePeerID, } @@ -2169,5 +2175,10 @@ func requireEqualInstances(t *testing.T, expect, got structs.CheckServiceNodes) require.Equal(t, expect[i].Checks[j].PartitionOrDefault(), got[i].Checks[j].PartitionOrDefault(), "partition mismatch") } } - +} + +func testUUID(t *testing.T) string { + v, err := lib.GenerateUUID(nil) + require.NoError(t, err) + return v } diff --git a/agent/rpc/peering/subscription_manager_test.go b/agent/rpc/peering/subscription_manager_test.go index a7c49090b..d556ff23e 100644 --- a/agent/rpc/peering/subscription_manager_test.go +++ b/agent/rpc/peering/subscription_manager_test.go @@ -589,6 +589,7 @@ func (b *testSubscriptionBackend) ensureCARoots(t *testing.T, roots ...*structs. func setupTestPeering(t *testing.T, store *state.Store, name string, index uint64) string { err := store.PeeringWrite(index, &pbpeering.Peering{ + ID: testUUID(t), Name: name, }) require.NoError(t, err)