state: Move UpstreamDownstream to state package

This commit is contained in:
Daniel Nephin 2021-03-16 13:37:44 -04:00
parent ca3686f4aa
commit d77bdd26c5
4 changed files with 25 additions and 26 deletions

View File

@ -3008,7 +3008,7 @@ func linkedFromRegistrationTxn(tx ReadTxn, ws memdb.WatchSet, service structs.Se
resp []structs.ServiceName resp []structs.ServiceName
) )
for raw := iter.Next(); raw != nil; raw = iter.Next() { for raw := iter.Next(); raw != nil; raw = iter.Next() {
entry := raw.(*structs.UpstreamDownstream) entry := raw.(*upstreamDownstream)
if entry.ModifyIndex > idx { if entry.ModifyIndex > idx {
idx = entry.ModifyIndex idx = entry.ModifyIndex
} }
@ -3060,13 +3060,13 @@ func updateMeshTopology(tx WriteTxn, idx uint64, node string, svc *structs.NodeS
sid := svc.CompoundServiceID() sid := svc.CompoundServiceID()
uid := structs.UniqueID(node, sid.String()) uid := structs.UniqueID(node, sid.String())
var mapping *structs.UpstreamDownstream var mapping *upstreamDownstream
if existing, ok := obj.(*structs.UpstreamDownstream); ok { if existing, ok := obj.(*upstreamDownstream); ok {
rawCopy, err := copystructure.Copy(existing) rawCopy, err := copystructure.Copy(existing)
if err != nil { if err != nil {
return fmt.Errorf("failed to copy existing topology mapping: %v", err) return fmt.Errorf("failed to copy existing topology mapping: %v", err)
} }
mapping, ok = rawCopy.(*structs.UpstreamDownstream) mapping, ok = rawCopy.(*upstreamDownstream)
if !ok { if !ok {
return fmt.Errorf("unexpected topology type %T", rawCopy) return fmt.Errorf("unexpected topology type %T", rawCopy)
} }
@ -3076,7 +3076,7 @@ func updateMeshTopology(tx WriteTxn, idx uint64, node string, svc *structs.NodeS
inserted[upstream] = true inserted[upstream] = true
} }
if mapping == nil { if mapping == nil {
mapping = &structs.UpstreamDownstream{ mapping = &upstreamDownstream{
Upstream: upstream, Upstream: upstream,
Downstream: downstream, Downstream: downstream,
Refs: map[string]struct{}{uid: {}}, Refs: map[string]struct{}{uid: {}},
@ -3124,9 +3124,9 @@ func cleanupMeshTopology(tx WriteTxn, idx uint64, service *structs.ServiceNode)
return fmt.Errorf("%q lookup failed: %v", tableMeshTopology, err) return fmt.Errorf("%q lookup failed: %v", tableMeshTopology, err)
} }
mappings := make([]*structs.UpstreamDownstream, 0) mappings := make([]*upstreamDownstream, 0)
for raw := iter.Next(); raw != nil; raw = iter.Next() { for raw := iter.Next(); raw != nil; raw = iter.Next() {
mappings = append(mappings, raw.(*structs.UpstreamDownstream)) mappings = append(mappings, raw.(*upstreamDownstream))
} }
// Do the updates in a separate loop so we don't trash the iterator. // Do the updates in a separate loop so we don't trash the iterator.
@ -3135,7 +3135,7 @@ func cleanupMeshTopology(tx WriteTxn, idx uint64, service *structs.ServiceNode)
if err != nil { if err != nil {
return fmt.Errorf("failed to copy existing topology mapping: %v", err) return fmt.Errorf("failed to copy existing topology mapping: %v", err)
} }
copy, ok := rawCopy.(*structs.UpstreamDownstream) copy, ok := rawCopy.(*upstreamDownstream)
if !ok { if !ok {
return fmt.Errorf("unexpected topology type %T", rawCopy) return fmt.Errorf("unexpected topology type %T", rawCopy)
} }
@ -3169,7 +3169,7 @@ func insertGatewayServiceTopologyMapping(tx WriteTxn, idx uint64, gs *structs.Ga
return nil return nil
} }
mapping := structs.UpstreamDownstream{ mapping := upstreamDownstream{
Upstream: gs.Service, Upstream: gs.Service,
Downstream: gs.Gateway, Downstream: gs.Gateway,
RaftIndex: gs.RaftIndex, RaftIndex: gs.RaftIndex,

View File

@ -41,7 +41,7 @@ func testIndexerTableChecks() map[string]indexerTestCase {
} }
func testIndexerTableMeshTopology() map[string]indexerTestCase { func testIndexerTableMeshTopology() map[string]indexerTestCase {
obj := structs.UpstreamDownstream{ obj := upstreamDownstream{
Upstream: structs.ServiceName{Name: "UpStReAm"}, Upstream: structs.ServiceName{Name: "UpStReAm"},
Downstream: structs.ServiceName{Name: "DownStream"}, Downstream: structs.ServiceName{Name: "DownStream"},
} }

View File

@ -322,3 +322,18 @@ func (index *ServiceNameIndex) PrefixFromArgs(args ...interface{}) ([]byte, erro
} }
return val, nil return val, nil
} }
// upstreamDownstream pairs come from individual proxy registrations, which can be updated independently.
type upstreamDownstream struct {
Upstream structs.ServiceName
Downstream structs.ServiceName
// Refs stores the registrations that contain this pairing.
// When there are no remaining Refs, the upstreamDownstream can be deleted.
//
// Note: This map must be treated as immutable when accessed in MemDB.
// The entire upstreamDownstream structure must be deep copied on updates.
Refs map[string]struct{}
structs.RaftIndex
}

View File

@ -2474,19 +2474,3 @@ func (m MessageType) String() string {
return "Unknown(" + strconv.Itoa(int(m)) + ")" return "Unknown(" + strconv.Itoa(int(m)) + ")"
} }
// UpstreamDownstream pairs come from individual proxy registrations, which can be updated independently.
// TODO: move to state package
type UpstreamDownstream struct {
Upstream ServiceName
Downstream ServiceName
// Refs stores the registrations that contain this pairing.
// When there are no remaining Refs, the UpstreamDownstream can be deleted.
//
// Note: This map must be treated as immutable when accessed in MemDB.
// The entire UpstreamDownstream structure must be deep copied on updates.
Refs map[string]struct{}
RaftIndex
}