From 8654adfc53c16d850550538cfdcd89f7247c28a6 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 8 Apr 2021 18:58:15 -0400 Subject: [PATCH 1/2] Handle FSM.Apply errors in raftApply Previously we were inconsistently checking the response for errors. This PR moves the response-is-error check into raftApply, so that all callers can look at only the error response, instead of having to know that errors could come from two places. This should expose a few more errors that were previously hidden because in some calls to raftApply we were ignoring the response return value. Also handle errors more consistently. In some cases we would log the error before returning it. This can be very confusing because it can result in the same error being logged multiple times. Instead return a wrapped error. --- agent/consul/acl_endpoint.go | 80 +++++--------------- agent/consul/acl_endpoint_legacy.go | 12 +-- agent/consul/acl_replication_types.go | 67 +++------------- agent/consul/acl_token_exp.go | 9 +-- agent/consul/catalog_endpoint.go | 25 +++--- agent/consul/config_endpoint.go | 13 +--- agent/consul/config_replication.go | 9 +-- agent/consul/consul_ca_delegate.go | 10 +-- agent/consul/coordinate_endpoint.go | 10 +-- agent/consul/federation_state_endpoint.go | 6 +- agent/consul/federation_state_replication.go | 12 +-- agent/consul/intention_endpoint.go | 18 ++--- agent/consul/kvs_endpoint.go | 11 +-- agent/consul/leader.go | 31 +++----- agent/consul/leader_connect.go | 11 +-- agent/consul/leader_connect_test.go | 10 +-- agent/consul/leader_federation_state_ae.go | 13 +--- agent/consul/leader_intentions.go | 7 +- agent/consul/leader_intentions_test.go | 15 +--- agent/consul/operator_autopilot_endpoint.go | 11 +-- agent/consul/prepared_query_endpoint.go | 16 ++-- agent/consul/rpc.go | 51 ++++++++----- agent/consul/session_endpoint.go | 14 ++-- agent/consul/system_metadata.go | 22 +----- agent/consul/txn_endpoint.go | 9 +-- 25 files changed, 142 insertions(+), 350 deletions(-) diff --git a/agent/consul/acl_endpoint.go b/agent/consul/acl_endpoint.go index b8ba08e0b..1a761fca2 100644 --- a/agent/consul/acl_endpoint.go +++ b/agent/consul/acl_endpoint.go @@ -13,16 +13,17 @@ import ( "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" + "github.com/hashicorp/go-bexpr" + "github.com/hashicorp/go-hclog" + memdb "github.com/hashicorp/go-memdb" + uuid "github.com/hashicorp/go-uuid" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/authmethod" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib/template" - "github.com/hashicorp/go-bexpr" - "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" - uuid "github.com/hashicorp/go-uuid" ) const ( @@ -250,15 +251,11 @@ func (a *ACL) BootstrapTokens(args *structs.DCSpecificRequest, reply *structs.AC req.Token.SetHash(true) - resp, err := a.srv.raftApply(structs.ACLBootstrapRequestType, &req) + _, err = a.srv.raftApply(structs.ACLBootstrapRequestType, &req) if err != nil { return err } - if err, ok := resp.(error); ok { - return err - } - if _, token, err := state.ACLTokenGetByAccessor(nil, accessor, structs.DefaultEnterpriseMeta()); err == nil { *reply = *token } @@ -729,7 +726,7 @@ func (a *ACL) tokenSetInternal(args *structs.ACLTokenSetRequest, reply *structs. req.ProhibitUnprivileged = true } - resp, err := a.srv.raftApply(structs.ACLTokenSetRequestType, req) + _, err = a.srv.raftApply(structs.ACLTokenSetRequestType, req) if err != nil { return fmt.Errorf("Failed to apply token write request: %v", err) } @@ -737,10 +734,6 @@ func (a *ACL) tokenSetInternal(args *structs.ACLTokenSetRequest, reply *structs. // Purge the identity from the cache to prevent using the previous definition of the identity a.srv.acls.cache.RemoveIdentity(tokenSecretCacheID(token.SecretID)) - if respErr, ok := resp.(error); ok { - return respErr - } - // Don't check expiration times here as it doesn't really matter. if _, updatedToken, err := a.srv.fsm.State().ACLTokenGetByAccessor(nil, token.AccessorID, nil); err == nil && updatedToken != nil { *reply = *updatedToken @@ -885,7 +878,7 @@ func (a *ACL) TokenDelete(args *structs.ACLTokenDeleteRequest, reply *string) er TokenIDs: []string{args.TokenID}, } - resp, err := a.srv.raftApply(structs.ACLTokenDeleteRequestType, req) + _, err = a.srv.raftApply(structs.ACLTokenDeleteRequestType, req) if err != nil { return fmt.Errorf("Failed to apply token delete request: %v", err) } @@ -893,10 +886,6 @@ func (a *ACL) TokenDelete(args *structs.ACLTokenDeleteRequest, reply *string) er // Purge the identity from the cache to prevent using the previous definition of the identity a.srv.acls.cache.RemoveIdentity(tokenSecretCacheID(token.SecretID)) - if respErr, ok := resp.(error); ok { - return respErr - } - if reply != nil { *reply = token.AccessorID } @@ -1218,7 +1207,7 @@ func (a *ACL) PolicySet(args *structs.ACLPolicySetRequest, reply *structs.ACLPol Policies: structs.ACLPolicies{policy}, } - resp, err := a.srv.raftApply(structs.ACLPolicySetRequestType, req) + _, err = a.srv.raftApply(structs.ACLPolicySetRequestType, req) if err != nil { return fmt.Errorf("Failed to apply policy upsert request: %v", err) } @@ -1226,10 +1215,6 @@ func (a *ACL) PolicySet(args *structs.ACLPolicySetRequest, reply *structs.ACLPol // Remove from the cache to prevent stale cache usage a.srv.acls.cache.RemovePolicy(policy.ID) - if respErr, ok := resp.(error); ok { - return respErr - } - if _, policy, err := a.srv.fsm.State().ACLPolicyGetByID(nil, policy.ID, &policy.EnterpriseMeta); err == nil && policy != nil { *reply = *policy } @@ -1282,17 +1267,13 @@ func (a *ACL) PolicyDelete(args *structs.ACLPolicyDeleteRequest, reply *string) PolicyIDs: []string{args.PolicyID}, } - resp, err := a.srv.raftApply(structs.ACLPolicyDeleteRequestType, &req) + _, err = a.srv.raftApply(structs.ACLPolicyDeleteRequestType, &req) if err != nil { return fmt.Errorf("Failed to apply policy delete request: %v", err) } a.srv.acls.cache.RemovePolicy(policy.ID) - if respErr, ok := resp.(error); ok { - return respErr - } - *reply = policy.Name return nil @@ -1687,7 +1668,7 @@ func (a *ACL) RoleSet(args *structs.ACLRoleSetRequest, reply *structs.ACLRole) e Roles: structs.ACLRoles{role}, } - resp, err := a.srv.raftApply(structs.ACLRoleSetRequestType, req) + _, err = a.srv.raftApply(structs.ACLRoleSetRequestType, req) if err != nil { return fmt.Errorf("Failed to apply role upsert request: %v", err) } @@ -1695,10 +1676,6 @@ func (a *ACL) RoleSet(args *structs.ACLRoleSetRequest, reply *structs.ACLRole) e // Remove from the cache to prevent stale cache usage a.srv.acls.cache.RemoveRole(role.ID) - if respErr, ok := resp.(error); ok { - return respErr - } - if _, role, err := a.srv.fsm.State().ACLRoleGetByID(nil, role.ID, &role.EnterpriseMeta); err == nil && role != nil { *reply = *role } @@ -1747,17 +1724,13 @@ func (a *ACL) RoleDelete(args *structs.ACLRoleDeleteRequest, reply *string) erro RoleIDs: []string{args.RoleID}, } - resp, err := a.srv.raftApply(structs.ACLRoleDeleteRequestType, &req) + _, err = a.srv.raftApply(structs.ACLRoleDeleteRequestType, &req) if err != nil { return fmt.Errorf("Failed to apply role delete request: %v", err) } a.srv.acls.cache.RemoveRole(role.ID) - if respErr, ok := resp.(error); ok { - return respErr - } - *reply = role.Name return nil @@ -2014,13 +1987,10 @@ func (a *ACL) BindingRuleSet(args *structs.ACLBindingRuleSetRequest, reply *stru BindingRules: structs.ACLBindingRules{rule}, } - resp, err := a.srv.raftApply(structs.ACLBindingRuleSetRequestType, req) + _, err = a.srv.raftApply(structs.ACLBindingRuleSetRequestType, req) if err != nil { return fmt.Errorf("Failed to apply binding rule upsert request: %v", err) } - if respErr, ok := resp.(error); ok { - return fmt.Errorf("Failed to apply binding rule upsert request: %v", respErr) - } if _, rule, err := a.srv.fsm.State().ACLBindingRuleGetByID(nil, rule.ID, &rule.EnterpriseMeta); err == nil && rule != nil { *reply = *rule @@ -2070,15 +2040,11 @@ func (a *ACL) BindingRuleDelete(args *structs.ACLBindingRuleDeleteRequest, reply BindingRuleIDs: []string{args.BindingRuleID}, } - resp, err := a.srv.raftApply(structs.ACLBindingRuleDeleteRequestType, &req) + _, err = a.srv.raftApply(structs.ACLBindingRuleDeleteRequestType, &req) if err != nil { return fmt.Errorf("Failed to apply binding rule delete request: %v", err) } - if respErr, ok := resp.(error); ok { - return respErr - } - *reply = true return nil @@ -2266,15 +2232,11 @@ func (a *ACL) AuthMethodSet(args *structs.ACLAuthMethodSetRequest, reply *struct AuthMethods: structs.ACLAuthMethods{method}, } - resp, err := a.srv.raftApply(structs.ACLAuthMethodSetRequestType, req) + _, err = a.srv.raftApply(structs.ACLAuthMethodSetRequestType, req) if err != nil { return fmt.Errorf("Failed to apply auth method upsert request: %v", err) } - if respErr, ok := resp.(error); ok { - return respErr - } - if _, method, err := a.srv.fsm.State().ACLAuthMethodGetByName(nil, method.Name, &method.EnterpriseMeta); err == nil && method != nil { *reply = *method } @@ -2328,15 +2290,11 @@ func (a *ACL) AuthMethodDelete(args *structs.ACLAuthMethodDeleteRequest, reply * EnterpriseMeta: args.EnterpriseMeta, } - resp, err := a.srv.raftApply(structs.ACLAuthMethodDeleteRequestType, &req) + _, err = a.srv.raftApply(structs.ACLAuthMethodDeleteRequestType, &req) if err != nil { return fmt.Errorf("Failed to apply auth method delete request: %v", err) } - if respErr, ok := resp.(error); ok { - return respErr - } - *reply = true return nil @@ -2583,7 +2541,7 @@ func (a *ACL) Logout(args *structs.ACLLogoutRequest, reply *bool) error { TokenIDs: []string{token.AccessorID}, } - resp, err := a.srv.raftApply(structs.ACLTokenDeleteRequestType, req) + _, err = a.srv.raftApply(structs.ACLTokenDeleteRequestType, req) if err != nil { return fmt.Errorf("Failed to apply token delete request: %v", err) } @@ -2591,10 +2549,6 @@ func (a *ACL) Logout(args *structs.ACLLogoutRequest, reply *bool) error { // Purge the identity from the cache to prevent using the previous definition of the identity a.srv.acls.cache.RemoveIdentity(tokenSecretCacheID(token.SecretID)) - if respErr, ok := resp.(error); ok { - return respErr - } - *reply = true return nil diff --git a/agent/consul/acl_endpoint_legacy.go b/agent/consul/acl_endpoint_legacy.go index 9cdfba668..df7dcf8e5 100644 --- a/agent/consul/acl_endpoint_legacy.go +++ b/agent/consul/acl_endpoint_legacy.go @@ -6,11 +6,12 @@ import ( "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/lib" - "github.com/hashicorp/go-memdb" ) var ACLEndpointLegacySummaries = []prometheus.SummaryDefinition{ @@ -68,9 +69,6 @@ func (a *ACL) Bootstrap(args *structs.DCSpecificRequest, reply *structs.ACL) err return err } switch v := resp.(type) { - case error: - return v - case *structs.ACL: *reply = *v @@ -143,11 +141,7 @@ func aclApplyInternal(srv *Server, args *structs.ACLRequest, reply *string) erro // Apply the update resp, err := srv.raftApply(structs.ACLRequestType, args) if err != nil { - srv.logger.Error("Raft apply failed", "acl_op", args.Op, "error", err) - return err - } - if respErr, ok := resp.(error); ok { - return respErr + return fmt.Errorf("raft apply failed: %w", err) } // Check if the return type is a string diff --git a/agent/consul/acl_replication_types.go b/agent/consul/acl_replication_types.go index 06bfe4e50..6516a5c71 100644 --- a/agent/consul/acl_replication_types.go +++ b/agent/consul/acl_replication_types.go @@ -86,14 +86,8 @@ func (r *aclTokenReplicator) DeleteLocalBatch(srv *Server, batch []string) error TokenIDs: batch, } - resp, err := srv.raftApply(structs.ACLTokenDeleteRequestType, &req) - if err != nil { - return err - } - if respErr, ok := resp.(error); ok { - return respErr - } - return nil + _, err := srv.raftApply(structs.ACLTokenDeleteRequestType, &req) + return err } func (r *aclTokenReplicator) LenPendingUpdates() int { @@ -116,15 +110,8 @@ func (r *aclTokenReplicator) UpdateLocalBatch(ctx context.Context, srv *Server, FromReplication: true, } - resp, err := srv.raftApply(structs.ACLTokenSetRequestType, &req) - if err != nil { - return err - } - if respErr, ok := resp.(error); ok { - return respErr - } - - return nil + _, err := srv.raftApply(structs.ACLTokenSetRequestType, &req) + return err } /////////////////////// @@ -199,14 +186,8 @@ func (r *aclPolicyReplicator) DeleteLocalBatch(srv *Server, batch []string) erro PolicyIDs: batch, } - resp, err := srv.raftApply(structs.ACLPolicyDeleteRequestType, &req) - if err != nil { - return err - } - if respErr, ok := resp.(error); ok { - return respErr - } - return nil + _, err := srv.raftApply(structs.ACLPolicyDeleteRequestType, &req) + return err } func (r *aclPolicyReplicator) LenPendingUpdates() int { @@ -226,16 +207,8 @@ func (r *aclPolicyReplicator) UpdateLocalBatch(ctx context.Context, srv *Server, Policies: r.updated[start:end], } - resp, err := srv.raftApply(structs.ACLPolicySetRequestType, &req) - if err != nil { - return err - } - - if respErr, ok := resp.(error); ok { - return respErr - } - - return nil + _, err := srv.raftApply(structs.ACLPolicySetRequestType, &req) + return err } //////////////////////////////// @@ -334,16 +307,8 @@ func (r *aclRoleReplicator) DeleteLocalBatch(srv *Server, batch []string) error RoleIDs: batch, } - resp, err := srv.raftApply(structs.ACLRoleDeleteRequestType, &req) - if err != nil { - return err - } - - if respErr, ok := resp.(error); ok { - return respErr - } - - return nil + _, err := srv.raftApply(structs.ACLRoleDeleteRequestType, &req) + return err } func (r *aclRoleReplicator) LenPendingUpdates() int { @@ -364,14 +329,6 @@ func (r *aclRoleReplicator) UpdateLocalBatch(ctx context.Context, srv *Server, s AllowMissingLinks: true, } - resp, err := srv.raftApply(structs.ACLRoleSetRequestType, &req) - if err != nil { - return err - } - - if respErr, ok := resp.(error); ok { - return respErr - } - - return nil + _, err := srv.raftApply(structs.ACLRoleSetRequestType, &req) + return err } diff --git a/agent/consul/acl_token_exp.go b/agent/consul/acl_token_exp.go index 4c0dbd2fe..ab2b35f08 100644 --- a/agent/consul/acl_token_exp.go +++ b/agent/consul/acl_token_exp.go @@ -5,8 +5,9 @@ import ( "fmt" "time" - "github.com/hashicorp/consul/agent/structs" "golang.org/x/time/rate" + + "github.com/hashicorp/consul/agent/structs" ) func (s *Server) reapExpiredTokens(ctx context.Context) error { @@ -102,7 +103,7 @@ func (s *Server) reapExpiredACLTokens(local, global bool) (int, error) { "amount", len(req.TokenIDs), "locality", locality, ) - resp, err := s.raftApply(structs.ACLTokenDeleteRequestType, &req) + _, err = s.raftApply(structs.ACLTokenDeleteRequestType, &req) if err != nil { return 0, fmt.Errorf("Failed to apply token expiration deletions: %v", err) } @@ -112,10 +113,6 @@ func (s *Server) reapExpiredACLTokens(local, global bool) (int, error) { s.acls.cache.RemoveIdentity(tokenSecretCacheID(secretID)) } - if respErr, ok := resp.(error); ok { - return 0, respErr - } - return len(req.TokenIDs), nil } diff --git a/agent/consul/catalog_endpoint.go b/agent/consul/catalog_endpoint.go index 9ee9fa9c0..e5d430992 100644 --- a/agent/consul/catalog_endpoint.go +++ b/agent/consul/catalog_endpoint.go @@ -7,15 +7,16 @@ import ( "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" + bexpr "github.com/hashicorp/go-bexpr" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-uuid" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/types" - bexpr "github.com/hashicorp/go-bexpr" - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-memdb" - "github.com/hashicorp/go-uuid" ) var CatalogCounters = []prometheus.CounterDefinition{ @@ -210,14 +211,8 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error } } - resp, err := c.srv.raftApply(structs.RegisterRequestType, args) - if err != nil { - return err - } - if respErr, ok := resp.(error); ok { - return respErr - } - return nil + _, err = c.srv.raftApply(structs.RegisterRequestType, args) + return err } // Deregister is used to remove a service registration for a given node. @@ -268,10 +263,8 @@ func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) e } - if _, err := c.srv.raftApply(structs.DeregisterRequestType, args); err != nil { - return err - } - return nil + _, err = c.srv.raftApply(structs.DeregisterRequestType, args) + return err } // ListDatacenters is used to query for the list of known datacenters diff --git a/agent/consul/config_endpoint.go b/agent/consul/config_endpoint.go index 6c4017daf..2061e76af 100644 --- a/agent/consul/config_endpoint.go +++ b/agent/consul/config_endpoint.go @@ -92,9 +92,6 @@ func (c *ConfigEntry) Apply(args *structs.ConfigEntryRequest, reply *bool) error if err != nil { return err } - if respErr, ok := resp.(error); ok { - return respErr - } if respBool, ok := resp.(bool); ok { *reply = respBool } @@ -296,14 +293,8 @@ func (c *ConfigEntry) Delete(args *structs.ConfigEntryRequest, reply *struct{}) } args.Op = structs.ConfigEntryDelete - resp, err := c.srv.raftApply(structs.ConfigEntryRequestType, args) - if err != nil { - return err - } - if respErr, ok := resp.(error); ok { - return respErr - } - return nil + _, err = c.srv.raftApply(structs.ConfigEntryRequestType, args) + return err } // ResolveServiceConfig diff --git a/agent/consul/config_replication.go b/agent/consul/config_replication.go index 3ab91be0c..536f75dc7 100644 --- a/agent/consul/config_replication.go +++ b/agent/consul/config_replication.go @@ -7,8 +7,9 @@ import ( "time" "github.com/armon/go-metrics" - "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/go-hclog" + + "github.com/hashicorp/consul/agent/structs" ) func configSort(configs []structs.ConfigEntry) { @@ -97,15 +98,11 @@ func (s *Server) reconcileLocalConfig(ctx context.Context, configs []structs.Con Entry: entry, } - resp, err := s.raftApply(structs.ConfigEntryRequestType, &req) + _, err := s.raftApply(structs.ConfigEntryRequestType, &req) if err != nil { return false, fmt.Errorf("Failed to apply config %s: %v", op, err) } - if respErr, ok := resp.(error); ok { - return false, fmt.Errorf("Failed to apply config %s: %v", op, respErr) - } - if i < len(configs)-1 { select { case <-ctx.Done(): diff --git a/agent/consul/consul_ca_delegate.go b/agent/consul/consul_ca_delegate.go index 9efe687ff..0f630a547 100644 --- a/agent/consul/consul_ca_delegate.go +++ b/agent/consul/consul_ca_delegate.go @@ -16,13 +16,5 @@ func (c *consulCADelegate) State() *state.Store { } func (c *consulCADelegate) ApplyCARequest(req *structs.CARequest) (interface{}, error) { - resp, err := c.srv.raftApply(structs.ConnectCARequestType, req) - if err != nil { - return nil, err - } - if respErr, ok := resp.(error); ok { - return nil, respErr - } - - return resp, nil + return c.srv.raftApply(structs.ConnectCARequestType, req) } diff --git a/agent/consul/coordinate_endpoint.go b/agent/consul/coordinate_endpoint.go index 984f317c4..2fbc10c70 100644 --- a/agent/consul/coordinate_endpoint.go +++ b/agent/consul/coordinate_endpoint.go @@ -6,13 +6,14 @@ import ( "sync" "time" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/types" - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-memdb" ) // Coordinate manages queries and updates for network coordinates. @@ -105,13 +106,10 @@ func (c *Coordinate) batchApplyUpdates() error { t := structs.CoordinateBatchUpdateType | structs.IgnoreUnknownTypeFlag slice := updates[start:end] - resp, err := c.srv.raftApply(t, slice) + _, err := c.srv.raftApply(t, slice) if err != nil { return err } - if respErr, ok := resp.(error); ok { - return respErr - } } return nil } diff --git a/agent/consul/federation_state_endpoint.go b/agent/consul/federation_state_endpoint.go index 88111364c..f5fa358bb 100644 --- a/agent/consul/federation_state_endpoint.go +++ b/agent/consul/federation_state_endpoint.go @@ -7,10 +7,11 @@ import ( "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" + memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" - memdb "github.com/hashicorp/go-memdb" ) var FederationStateSummaries = []prometheus.SummaryDefinition{ @@ -85,9 +86,6 @@ func (c *FederationState) Apply(args *structs.FederationStateRequest, reply *boo if err != nil { return err } - if respErr, ok := resp.(error); ok { - return respErr - } if respBool, ok := resp.(bool); ok { *reply = respBool } diff --git a/agent/consul/federation_state_replication.go b/agent/consul/federation_state_replication.go index 8d039b814..5379a1e28 100644 --- a/agent/consul/federation_state_replication.go +++ b/agent/consul/federation_state_replication.go @@ -154,15 +154,11 @@ func (r *FederationStateReplicator) PerformDeletions(ctx context.Context, deleti State: state, } - resp, err := r.srv.raftApply(structs.FederationStateRequestType, &req) + _, err := r.srv.raftApply(structs.FederationStateRequestType, &req) if err != nil { return false, err } - if respErr, ok := resp.(error); ok { - return false, respErr - } - if i < len(deletions)-1 { select { case <-ctx.Done(): @@ -199,15 +195,11 @@ func (r *FederationStateReplicator) PerformUpdates(ctx context.Context, updatesR State: state2, } - resp, err := r.srv.raftApply(structs.FederationStateRequestType, &req) + _, err := r.srv.raftApply(structs.FederationStateRequestType, &req) if err != nil { return false, err } - if respErr, ok := resp.(error); ok { - return false, respErr - } - if i < len(updates)-1 { select { case <-ctx.Done(): diff --git a/agent/consul/intention_endpoint.go b/agent/consul/intention_endpoint.go index cb6208293..83c7f4bcd 100644 --- a/agent/consul/intention_endpoint.go +++ b/agent/consul/intention_endpoint.go @@ -7,13 +7,14 @@ import ( "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" + "github.com/hashicorp/go-bexpr" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/lib" - "github.com/hashicorp/go-bexpr" - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-memdb" ) var IntentionSummaries = []prometheus.SummaryDefinition{ @@ -157,15 +158,8 @@ func (s *Intention) Apply(args *structs.IntentionRequest, reply *string) error { args.Mutation = mut args.Intention = nil - resp, err := s.srv.raftApply(structs.IntentionRequestType, args) - if err != nil { - return err - } - if respErr, ok := resp.(error); ok { - return respErr - } - - return nil + _, err = s.srv.raftApply(structs.IntentionRequestType, args) + return err } func (s *Intention) computeApplyChangesLegacyCreate( diff --git a/agent/consul/kvs_endpoint.go b/agent/consul/kvs_endpoint.go index c6aee9380..d52ba169c 100644 --- a/agent/consul/kvs_endpoint.go +++ b/agent/consul/kvs_endpoint.go @@ -7,12 +7,13 @@ import ( "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-memdb" ) var KVSummaries = []prometheus.SummaryDefinition{ @@ -122,11 +123,7 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error { // Apply the update. resp, err := k.srv.raftApply(structs.KVSRequestType, args) if err != nil { - k.logger.Error("Raft apply failed", "error", err) - return err - } - if respErr, ok := resp.(error); ok { - return respErr + return fmt.Errorf("raft apply failed: %w", err) } // Check if the return type is a bool. diff --git a/agent/consul/leader.go b/agent/consul/leader.go index e4470ac1f..5a573b25a 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -12,13 +12,6 @@ import ( "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" - "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/metadata" - "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/lib" - "github.com/hashicorp/consul/logging" - "github.com/hashicorp/consul/types" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-uuid" @@ -26,6 +19,14 @@ import ( "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" "golang.org/x/time/rate" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/metadata" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/lib" + "github.com/hashicorp/consul/logging" + "github.com/hashicorp/consul/types" ) var LeaderSummaries = []prometheus.SummaryDefinition{ @@ -478,9 +479,6 @@ func (s *Server) initializeLegacyACL() error { return fmt.Errorf("failed to initialize ACL bootstrap: %v", err) } switch v := resp.(type) { - case error: - return fmt.Errorf("failed to initialize ACL bootstrap: %v", v) - case bool: if v { s.logger.Info("ACL bootstrap enabled") @@ -766,14 +764,10 @@ func (s *Server) legacyACLTokenUpgrade(ctx context.Context) error { req := &structs.ACLTokenBatchSetRequest{Tokens: newTokens, CAS: true} - resp, err := s.raftApply(structs.ACLTokenSetRequestType, req) + _, err = s.raftApply(structs.ACLTokenSetRequestType, req) if err != nil { s.logger.Error("failed to apply acl token upgrade batch", "error", err) } - - if err, ok := resp.(error); ok { - s.logger.Error("failed to apply acl token upgrade batch", "error", err) - } } } @@ -1088,12 +1082,7 @@ func (s *Server) bootstrapConfigEntries(entries []structs.ConfigEntry) error { Entry: entry, } - resp, err := s.raftApply(structs.ConfigEntryRequestType, &req) - if err == nil { - if respErr, ok := resp.(error); ok { - err = respErr - } - } + _, err := s.raftApply(structs.ConfigEntryRequestType, &req) if err != nil { return fmt.Errorf("Failed to apply configuration entry %q / %q: %v", entry.GetKind(), entry.GetName(), err) } diff --git a/agent/consul/leader_connect.go b/agent/consul/leader_connect.go index 2104e7c9e..18ca3d114 100644 --- a/agent/consul/leader_connect.go +++ b/agent/consul/leader_connect.go @@ -135,15 +135,8 @@ func (s *Server) pruneCARoots() error { args.Op = structs.CAOpSetRoots args.Index = idx args.Roots = newRoots - resp, err := s.raftApply(structs.ConnectCARequestType, args) - if err != nil { - return err - } - if respErr, ok := resp.(error); ok { - return respErr - } - - return nil + _, err = s.raftApply(structs.ConnectCARequestType, args) + return err } // retryLoopBackoff loops a given function indefinitely, backing off exponentially diff --git a/agent/consul/leader_connect_test.go b/agent/consul/leader_connect_test.go index 181dc9378..527c41934 100644 --- a/agent/consul/leader_connect_test.go +++ b/agent/consul/leader_connect_test.go @@ -628,15 +628,12 @@ func TestLeader_Vault_PrimaryCA_FixSigningKeyID_OnRestart(t *testing.T) { activePrimaryRoot.SigningKeyID = primaryRootSigningKeyID // Store the root cert in raft - resp, err := s1pre.raftApply(structs.ConnectCARequestType, &structs.CARequest{ + _, err = s1pre.raftApply(structs.ConnectCARequestType, &structs.CARequest{ Op: structs.CAOpSetRoots, Index: idx, Roots: []*structs.CARoot{activePrimaryRoot}, }) require.NoError(t, err) - if respErr, ok := resp.(error); ok { - t.Fatalf("respErr: %v", respErr) - } } // Shutdown s1pre and restart it to trigger the secondary CA init to correct @@ -731,15 +728,12 @@ func TestLeader_SecondaryCA_FixSigningKeyID_via_IntermediateRefresh(t *testing.T activeSecondaryRoot.SigningKeyID = secondaryRootSigningKeyID // Store the root cert in raft - resp, err := s2pre.raftApply(structs.ConnectCARequestType, &structs.CARequest{ + _, err = s2pre.raftApply(structs.ConnectCARequestType, &structs.CARequest{ Op: structs.CAOpSetRoots, Index: idx, Roots: []*structs.CARoot{activeSecondaryRoot}, }) require.NoError(t, err) - if respErr, ok := resp.(error); ok { - t.Fatalf("respErr: %v", respErr) - } } // Shutdown s2pre and restart it to trigger the secondary CA init to correct diff --git a/agent/consul/leader_federation_state_ae.go b/agent/consul/leader_federation_state_ae.go index 5adf08f34..060962a70 100644 --- a/agent/consul/leader_federation_state_ae.go +++ b/agent/consul/leader_federation_state_ae.go @@ -5,9 +5,10 @@ import ( "fmt" "time" + memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" - memdb "github.com/hashicorp/go-memdb" ) const ( @@ -117,13 +118,10 @@ func (s *Server) updateOurFederationState(curr *structs.FederationState) error { if s.config.Datacenter == s.config.PrimaryDatacenter { // We are the primary, so we can't do an RPC as we don't have a replication token. - resp, err := s.raftApply(structs.FederationStateRequestType, args) + _, err := s.raftApply(structs.FederationStateRequestType, args) if err != nil { return err } - if respErr, ok := resp.(error); ok { - return respErr - } } else { args.WriteRequest = structs.WriteRequest{ Token: s.tokens.ReplicationToken(), @@ -225,13 +223,10 @@ func (s *Server) pruneStaleFederationStates() error { Datacenter: dc, }, } - resp, err := s.raftApply(structs.FederationStateRequestType, &req) + _, err := s.raftApply(structs.FederationStateRequestType, &req) if err != nil { return fmt.Errorf("Failed to delete federation state %s: %v", dc, err) } - if respErr, ok := resp.(error); ok { - return fmt.Errorf("Failed to delete federation state %s: %v", dc, respErr) - } } return nil diff --git a/agent/consul/leader_intentions.go b/agent/consul/leader_intentions.go index 5d31accca..4fedd2960 100644 --- a/agent/consul/leader_intentions.go +++ b/agent/consul/leader_intentions.go @@ -164,10 +164,8 @@ func (s *Server) legacyIntentionsMigrationCleanupPhase(quiet bool) error { req := structs.IntentionRequest{ Op: structs.IntentionOpDeleteAll, } - if resp, err := s.raftApply(structs.IntentionRequestType, req); err != nil { + if _, err := s.raftApply(structs.IntentionRequestType, req); err != nil { return err - } else if respErr, ok := resp.(error); ok { - return respErr } // Bypass the serf component and jump right to the final state. @@ -410,9 +408,6 @@ func (s *Server) replicateLegacyIntentionsOnce(ctx context.Context, lastFetchInd if err != nil { return 0, false, err } - if respErr, ok := resp.(error); ok { - return 0, false, respErr - } if txnResp, ok := resp.(structs.TxnResponse); ok { if len(txnResp.Errors) > 0 { diff --git a/agent/consul/leader_intentions_test.go b/agent/consul/leader_intentions_test.go index 1f810a7d7..02a24cf7f 100644 --- a/agent/consul/leader_intentions_test.go +++ b/agent/consul/leader_intentions_test.go @@ -108,14 +108,8 @@ func TestLeader_ReplicateIntentions(t *testing.T) { if req.Op != structs.IntentionOpDelete { req2.Intention.Hash = req.Intention.Hash // not part of Clone } - resp, err := s.raftApply(structs.IntentionRequestType, req2) - if err != nil { - return err - } - if respErr, ok := resp.(error); ok { - return respErr - } - return nil + _, err := s.raftApply(structs.IntentionRequestType, req2) + return err } // Directly insert legacy intentions into raft in dc1. @@ -442,14 +436,11 @@ func TestLeader_LegacyIntentionMigration(t *testing.T) { var retained []*structs.Intention for _, ixn := range ixns { ixn2 := *ixn - resp, err := s1pre.raftApply(structs.IntentionRequestType, &structs.IntentionRequest{ + _, err := s1pre.raftApply(structs.IntentionRequestType, &structs.IntentionRequest{ Op: structs.IntentionOpCreate, Intention: &ixn2, }) require.NoError(t, err) - if respErr, ok := resp.(error); ok { - t.Fatalf("respErr: %v", respErr) - } if _, present := ixn.Meta["unit-test-discarded"]; !present { retained = append(retained, ixn) diff --git a/agent/consul/operator_autopilot_endpoint.go b/agent/consul/operator_autopilot_endpoint.go index 2b122e522..29cdbe912 100644 --- a/agent/consul/operator_autopilot_endpoint.go +++ b/agent/consul/operator_autopilot_endpoint.go @@ -3,10 +3,11 @@ package consul import ( "fmt" - "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/structs" autopilot "github.com/hashicorp/raft-autopilot" "github.com/hashicorp/serf/serf" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/structs" ) // AutopilotGetConfiguration is used to retrieve the current Autopilot configuration. @@ -62,11 +63,7 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe // Apply the update resp, err := op.srv.raftApply(structs.AutopilotRequestType, args) if err != nil { - op.logger.Error("Raft apply failed", "error", err) - return err - } - if respErr, ok := resp.(error); ok { - return respErr + return fmt.Errorf("raft apply failed: %w", err) } // Check if the return type is a bool. diff --git a/agent/consul/prepared_query_endpoint.go b/agent/consul/prepared_query_endpoint.go index 360c80b9b..e4dc05e9a 100644 --- a/agent/consul/prepared_query_endpoint.go +++ b/agent/consul/prepared_query_endpoint.go @@ -7,13 +7,14 @@ import ( "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-uuid" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/logging" - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-memdb" - "github.com/hashicorp/go-uuid" ) var PreparedQuerySummaries = []prometheus.SummaryDefinition{ @@ -128,15 +129,10 @@ func (p *PreparedQuery) Apply(args *structs.PreparedQueryRequest, reply *string) } // Commit the query to the state store. - resp, err := p.srv.raftApply(structs.PreparedQueryRequestType, args) + _, err = p.srv.raftApply(structs.PreparedQueryRequestType, args) if err != nil { - p.logger.Error("Raft apply failed", "error", err) - return err + return fmt.Errorf("raft apply failed: %w", err) } - if respErr, ok := resp.(error); ok { - return respErr - } - return nil } diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index b72360259..ff7cf53d8 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -14,14 +14,6 @@ import ( "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" - "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/consul/state" - "github.com/hashicorp/consul/agent/consul/wanfed" - "github.com/hashicorp/consul/agent/metadata" - "github.com/hashicorp/consul/agent/pool" - "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/consul/lib" - "github.com/hashicorp/consul/logging" connlimit "github.com/hashicorp/go-connlimit" "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" @@ -30,6 +22,15 @@ import ( msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/raft" "github.com/hashicorp/yamux" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/consul/wanfed" + "github.com/hashicorp/consul/agent/metadata" + "github.com/hashicorp/consul/agent/pool" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib" + "github.com/hashicorp/consul/logging" ) var RPCCounters = []prometheus.CounterDefinition{ @@ -729,28 +730,34 @@ func (s *Server) keyringRPCs(method string, args interface{}, dcs []string) (*st type raftEncoder func(structs.MessageType, interface{}) ([]byte, error) -// raftApply is used to encode a message, run it through raft, and return -// the FSM response along with any errors +// raftApplyMsgpack encodes the msg using msgpack and calls raft.Apply. See +// raftApplyWithEncoder. +// Deprecated: use raftApplyMsgpack func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, error) { return s.raftApplyMsgpack(t, msg) } -// raftApplyMsgpack will msgpack encode the request and then run it through raft, -// then return the FSM response along with any errors. +// raftApplyMsgpack encodes the msg using msgpack and calls raft.Apply. See +// raftApplyWithEncoder. func (s *Server) raftApplyMsgpack(t structs.MessageType, msg interface{}) (interface{}, error) { return s.raftApplyWithEncoder(t, msg, structs.Encode) } -// raftApplyProtobuf will protobuf encode the request and then run it through raft, -// then return the FSM response along with any errors. +// raftApplyProtobuf encodes the msg using protobuf and calls raft.Apply. See +// raftApplyWithEncoder. func (s *Server) raftApplyProtobuf(t structs.MessageType, msg interface{}) (interface{}, error) { return s.raftApplyWithEncoder(t, msg, structs.EncodeProtoInterface) } -// raftApplyWithEncoder is used to encode a message, run it through raft, -// and return the FSM response along with any errors. Unlike raftApply this -// takes the encoder to use as an argument. -func (s *Server) raftApplyWithEncoder(t structs.MessageType, msg interface{}, encoder raftEncoder) (interface{}, error) { +// raftApplyWithEncoder encodes a message, and then calls raft.Apply with the +// encoded message. Returns the FSM response along with any errors. If the +// FSM.Apply response is an error it will be returned as the error return +// value with a nil response. +func (s *Server) raftApplyWithEncoder( + t structs.MessageType, + msg interface{}, + encoder raftEncoder, +) (response interface{}, err error) { if encoder == nil { return nil, fmt.Errorf("Failed to encode request: nil encoder") } @@ -789,17 +796,19 @@ func (s *Server) raftApplyWithEncoder(t structs.MessageType, msg interface{}, en // apply function. Downstream client code expects to see any error // from the FSM (as opposed to the apply itself) and decide whether // it can retry in the future's response. - return ErrChunkingResubmit, nil + return nil, ErrChunkingResubmit } // We expect that this conversion should always work chunkedSuccess, ok := resp.(raftchunking.ChunkingSuccess) if !ok { return nil, errors.New("unknown type of response back from chunking FSM") } - // Return the inner wrapped response - return chunkedSuccess.Response, nil + resp = chunkedSuccess.Response } + if err, ok := resp.(error); ok { + return nil, err + } return resp, nil } diff --git a/agent/consul/session_endpoint.go b/agent/consul/session_endpoint.go index d3d360488..1287ca796 100644 --- a/agent/consul/session_endpoint.go +++ b/agent/consul/session_endpoint.go @@ -6,12 +6,13 @@ import ( "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" - "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/consul/state" - "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-uuid" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/structs" ) var SessionEndpointSummaries = []prometheus.SummaryDefinition{ @@ -147,8 +148,7 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error { // Apply the update resp, err := s.srv.raftApply(structs.SessionRequestType, args) if err != nil { - s.logger.Error("Apply failed", "error", err) - return err + return fmt.Errorf("apply failed: %w", err) } if args.Op == structs.SessionCreate && args.Session.TTL != "" { @@ -160,10 +160,6 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error { s.srv.clearSessionTimer(args.Session.ID) } - if respErr, ok := resp.(error); ok { - return respErr - } - // Check if the return type is a string if respString, ok := resp.(string); ok { *reply = respString diff --git a/agent/consul/system_metadata.go b/agent/consul/system_metadata.go index c630a8935..87832b81c 100644 --- a/agent/consul/system_metadata.go +++ b/agent/consul/system_metadata.go @@ -22,15 +22,8 @@ func (s *Server) setSystemMetadataKey(key, val string) error { Entry: &structs.SystemMetadataEntry{Key: key, Value: val}, } - resp, err := s.raftApply(structs.SystemMetadataRequestType, args) - if err != nil { - return err - } - if respErr, ok := resp.(error); ok { - return respErr - } - - return nil + _, err := s.raftApply(structs.SystemMetadataRequestType, args) + return err } func (s *Server) deleteSystemMetadataKey(key string) error { @@ -39,13 +32,6 @@ func (s *Server) deleteSystemMetadataKey(key string) error { Entry: &structs.SystemMetadataEntry{Key: key}, } - resp, err := s.raftApply(structs.SystemMetadataRequestType, args) - if err != nil { - return err - } - if respErr, ok := resp.(error); ok { - return respErr - } - - return nil + _, err := s.raftApply(structs.SystemMetadataRequestType, args) + return err } diff --git a/agent/consul/txn_endpoint.go b/agent/consul/txn_endpoint.go index 9febc8b89..da1a224cb 100644 --- a/agent/consul/txn_endpoint.go +++ b/agent/consul/txn_endpoint.go @@ -6,10 +6,11 @@ import ( "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" - "github.com/hashicorp/go-hclog" ) var TxnSummaries = []prometheus.SummaryDefinition{ @@ -138,11 +139,7 @@ func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error // Apply the update. resp, err := t.srv.raftApply(structs.TxnRequestType, args) if err != nil { - t.logger.Error("Raft apply failed", "error", err) - return err - } - if respErr, ok := resp.(error); ok { - return respErr + return fmt.Errorf("raft apply failed: %w", err) } // Convert the return type. This should be a cheap copy since we are From 6d1a5b362928d9395569fda2e6e04484e0e2ea98 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 9 Apr 2021 13:34:09 -0400 Subject: [PATCH 2/2] Handle ErrChunkingResubmit.Error properly Previously canRetry was attempting to retrieve this error from args, however there was never any callers that would pass an error to args. With the change to raftApply to move this error to the error return value, it is now possible to receive this error from the err argument. This commit updates canRetry to check for ErrChunkingResubmit in err. --- agent/consul/rpc.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index ff7cf53d8..80d5b95e6 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -535,9 +535,8 @@ func canRetry(args interface{}, err error) bool { return true } - // If we are chunking and it doesn't seem to have completed, try again - intErr, ok := args.(error) - if ok && strings.Contains(intErr.Error(), ErrChunkingResubmit.Error()) { + // If we are chunking and it doesn't seem to have completed, try again. + if err != nil && strings.Contains(err.Error(), ErrChunkingResubmit.Error()) { return true }