Fix secondary dc connect CA roots watch issue
The general problem was that a the CA config which contained the trust domain was happening outside of the blocking mechanism so if the client started the blocking query before the primary dcs roots had been set then a state trust domain was being pushed down. This was fixed here but in the future we should probably fixup the CA initialization code to not initialize the CA config twice when it doesn’t need to.
This commit is contained in:
parent
44dea31d1f
commit
03ccc7c5ae
|
@ -316,41 +316,25 @@ func (s *ConnectCA) Roots(
|
|||
return ErrConnectNotEnabled
|
||||
}
|
||||
|
||||
// Load the ClusterID to generate TrustDomain. We do this outside the loop
|
||||
// since by definition this value should be immutable once set for lifetime of
|
||||
// the cluster so we don't need to look it up more than once. We also don't
|
||||
// have to worry about non-atomicity between the config fetch transaction and
|
||||
// the CARoots transaction below since this field must remain immutable. Do
|
||||
// not re-use this state/config for other logic that might care about changes
|
||||
// of config during the blocking query below.
|
||||
{
|
||||
state := s.srv.fsm.State()
|
||||
_, config, err := state.CAConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check CA is actually bootstrapped...
|
||||
if config != nil {
|
||||
// Build TrustDomain based on the ClusterID stored.
|
||||
signingID := connect.SpiffeIDSigningForCluster(config)
|
||||
if signingID == nil {
|
||||
// If CA is bootstrapped at all then this should never happen but be
|
||||
// defensive.
|
||||
return errors.New("no cluster trust domain setup")
|
||||
}
|
||||
reply.TrustDomain = signingID.Host()
|
||||
}
|
||||
}
|
||||
|
||||
return s.srv.blockingQuery(
|
||||
&args.QueryOptions, &reply.QueryMeta,
|
||||
func(ws memdb.WatchSet, state *state.Store) error {
|
||||
index, roots, err := state.CARoots(ws)
|
||||
index, roots, config, err := state.CARootsAndConfig(ws)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if config != nil {
|
||||
// Build TrustDomain based on the ClusterID stored.
|
||||
signingID := connect.SpiffeIDSigningForCluster(config)
|
||||
if signingID == nil {
|
||||
// If CA is bootstrapped at all then this should never happen but be
|
||||
// defensive.
|
||||
return errors.New("no cluster trust domain setup")
|
||||
}
|
||||
reply.TrustDomain = signingID.Host()
|
||||
}
|
||||
|
||||
reply.Index, reply.Roots = index, roots
|
||||
if reply.Roots == nil {
|
||||
reply.Roots = make(structs.CARoots, 0)
|
||||
|
@ -415,6 +399,8 @@ func (s *ConnectCA) Sign(
|
|||
provider, caRoot := s.srv.getCAProvider()
|
||||
if provider == nil {
|
||||
return fmt.Errorf("internal error: CA provider is nil")
|
||||
} else if caRoot == nil {
|
||||
return fmt.Errorf("internal error: CA root is nil")
|
||||
}
|
||||
|
||||
// Verify that the CSR entity is in the cluster's trust domain
|
||||
|
|
|
@ -112,11 +112,17 @@ func (s *Store) CAConfig() (uint64, *structs.CAConfiguration, error) {
|
|||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
return s.caConfigTxn(tx, nil)
|
||||
}
|
||||
|
||||
func (s *Store) caConfigTxn(tx *memdb.Txn, ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error) {
|
||||
// Get the CA config
|
||||
c, err := tx.First(caConfigTableName, "id")
|
||||
ch, c, err := tx.FirstWatch(caConfigTableName, "id")
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed CA config lookup: %s", err)
|
||||
}
|
||||
|
||||
ws.Add(ch)
|
||||
|
||||
config, ok := c.(*structs.CAConfiguration)
|
||||
if !ok {
|
||||
|
@ -156,7 +162,7 @@ func (s *Store) CACheckAndSetConfig(idx, cidx uint64, config *structs.CAConfigur
|
|||
// index arg, then we shouldn't update anything and can safely
|
||||
// return early here.
|
||||
e, ok := existing.(*structs.CAConfiguration)
|
||||
if !ok || e.ModifyIndex != cidx {
|
||||
if (ok && e.ModifyIndex != cidx) || (!ok && cidx != 0) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
|
@ -188,6 +194,7 @@ func (s *Store) caSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.CAConf
|
|||
}
|
||||
config.ModifyIndex = idx
|
||||
|
||||
fmt.Printf("\n\nInserting CA Config: %#v", config)
|
||||
if err := tx.Insert(caConfigTableName, config); err != nil {
|
||||
return fmt.Errorf("failed updating CA config: %s", err)
|
||||
}
|
||||
|
@ -227,6 +234,10 @@ func (s *Store) CARoots(ws memdb.WatchSet) (uint64, structs.CARoots, error) {
|
|||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
return s.caRootsTxn(tx, ws)
|
||||
}
|
||||
|
||||
func (s *Store) caRootsTxn(tx *memdb.Txn, ws memdb.WatchSet) (uint64, structs.CARoots, error) {
|
||||
// Get the index
|
||||
idx := maxIndexTxn(tx, caRootTableName)
|
||||
|
||||
|
@ -268,6 +279,7 @@ func (s *Store) CARootActive(ws memdb.WatchSet) (uint64, *structs.CARoot, error)
|
|||
//
|
||||
// The first boolean result returns whether the transaction succeeded or not.
|
||||
func (s *Store) CARootSetCAS(idx, cidx uint64, rs []*structs.CARoot) (bool, error) {
|
||||
fmt.Printf("\n\nSetting the CA Roots: idx: %d, cidx: %d - %#v\n", idx, cidx, rs)
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
|
@ -315,6 +327,7 @@ func (s *Store) CARootSetCAS(idx, cidx uint64, rs []*structs.CARoot) (bool, erro
|
|||
|
||||
// Insert all
|
||||
for _, r := range rs {
|
||||
fmt.Printf("Inserting CA Root: %#v\n", r)
|
||||
if err := tx.Insert(caRootTableName, r); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
@ -325,6 +338,7 @@ func (s *Store) CARootSetCAS(idx, cidx uint64, rs []*structs.CARoot) (bool, erro
|
|||
return false, fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
fmt.Printf("\n\n")
|
||||
tx.Commit()
|
||||
return true, nil
|
||||
}
|
||||
|
@ -450,3 +464,25 @@ func (s *Store) CALeafSetIndex(index uint64) error {
|
|||
|
||||
return indexUpdateMaxTxn(tx, index, caLeafIndexName)
|
||||
}
|
||||
|
||||
func (s *Store) CARootsAndConfig(ws memdb.WatchSet) (uint64, structs.CARoots, *structs.CAConfiguration, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
confIdx, config, err := s.caConfigTxn(tx, ws)
|
||||
if err != nil {
|
||||
return 0, nil, nil, fmt.Errorf("failed CA config lookup: %v", err)
|
||||
}
|
||||
|
||||
rootsIdx, roots, err := s.caRootsTxn(tx, ws)
|
||||
if err != nil {
|
||||
return 0, nil, nil, fmt.Errorf("failed CA roots lookup: %v", err)
|
||||
}
|
||||
|
||||
idx := rootsIdx
|
||||
if confIdx > idx {
|
||||
idx = confIdx
|
||||
}
|
||||
|
||||
return idx, roots, config, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue