Merge pull request #9991 from hashicorp/dnephin/handle-raft-apply-errors

Handle FSM.Apply errors in raftApply
This commit is contained in:
Daniel Nephin 2021-04-20 13:58:39 -04:00 committed by GitHub
commit 3f38964da6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 144 additions and 353 deletions

View File

@ -13,16 +13,17 @@ import (
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
uuid "github.com/hashicorp/go-uuid"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/authmethod"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/lib/template"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
uuid "github.com/hashicorp/go-uuid"
)
const (
@ -250,15 +251,11 @@ func (a *ACL) BootstrapTokens(args *structs.DCSpecificRequest, reply *structs.AC
req.Token.SetHash(true)
resp, err := a.srv.raftApply(structs.ACLBootstrapRequestType, &req)
_, err = a.srv.raftApply(structs.ACLBootstrapRequestType, &req)
if err != nil {
return err
}
if err, ok := resp.(error); ok {
return err
}
if _, token, err := state.ACLTokenGetByAccessor(nil, accessor, structs.DefaultEnterpriseMeta()); err == nil {
*reply = *token
}
@ -729,7 +726,7 @@ func (a *ACL) tokenSetInternal(args *structs.ACLTokenSetRequest, reply *structs.
req.ProhibitUnprivileged = true
}
resp, err := a.srv.raftApply(structs.ACLTokenSetRequestType, req)
_, err = a.srv.raftApply(structs.ACLTokenSetRequestType, req)
if err != nil {
return fmt.Errorf("Failed to apply token write request: %v", err)
}
@ -737,10 +734,6 @@ func (a *ACL) tokenSetInternal(args *structs.ACLTokenSetRequest, reply *structs.
// Purge the identity from the cache to prevent using the previous definition of the identity
a.srv.acls.cache.RemoveIdentity(tokenSecretCacheID(token.SecretID))
if respErr, ok := resp.(error); ok {
return respErr
}
// Don't check expiration times here as it doesn't really matter.
if _, updatedToken, err := a.srv.fsm.State().ACLTokenGetByAccessor(nil, token.AccessorID, nil); err == nil && updatedToken != nil {
*reply = *updatedToken
@ -885,7 +878,7 @@ func (a *ACL) TokenDelete(args *structs.ACLTokenDeleteRequest, reply *string) er
TokenIDs: []string{args.TokenID},
}
resp, err := a.srv.raftApply(structs.ACLTokenDeleteRequestType, req)
_, err = a.srv.raftApply(structs.ACLTokenDeleteRequestType, req)
if err != nil {
return fmt.Errorf("Failed to apply token delete request: %v", err)
}
@ -893,10 +886,6 @@ func (a *ACL) TokenDelete(args *structs.ACLTokenDeleteRequest, reply *string) er
// Purge the identity from the cache to prevent using the previous definition of the identity
a.srv.acls.cache.RemoveIdentity(tokenSecretCacheID(token.SecretID))
if respErr, ok := resp.(error); ok {
return respErr
}
if reply != nil {
*reply = token.AccessorID
}
@ -1218,7 +1207,7 @@ func (a *ACL) PolicySet(args *structs.ACLPolicySetRequest, reply *structs.ACLPol
Policies: structs.ACLPolicies{policy},
}
resp, err := a.srv.raftApply(structs.ACLPolicySetRequestType, req)
_, err = a.srv.raftApply(structs.ACLPolicySetRequestType, req)
if err != nil {
return fmt.Errorf("Failed to apply policy upsert request: %v", err)
}
@ -1226,10 +1215,6 @@ func (a *ACL) PolicySet(args *structs.ACLPolicySetRequest, reply *structs.ACLPol
// Remove from the cache to prevent stale cache usage
a.srv.acls.cache.RemovePolicy(policy.ID)
if respErr, ok := resp.(error); ok {
return respErr
}
if _, policy, err := a.srv.fsm.State().ACLPolicyGetByID(nil, policy.ID, &policy.EnterpriseMeta); err == nil && policy != nil {
*reply = *policy
}
@ -1282,17 +1267,13 @@ func (a *ACL) PolicyDelete(args *structs.ACLPolicyDeleteRequest, reply *string)
PolicyIDs: []string{args.PolicyID},
}
resp, err := a.srv.raftApply(structs.ACLPolicyDeleteRequestType, &req)
_, err = a.srv.raftApply(structs.ACLPolicyDeleteRequestType, &req)
if err != nil {
return fmt.Errorf("Failed to apply policy delete request: %v", err)
}
a.srv.acls.cache.RemovePolicy(policy.ID)
if respErr, ok := resp.(error); ok {
return respErr
}
*reply = policy.Name
return nil
@ -1687,7 +1668,7 @@ func (a *ACL) RoleSet(args *structs.ACLRoleSetRequest, reply *structs.ACLRole) e
Roles: structs.ACLRoles{role},
}
resp, err := a.srv.raftApply(structs.ACLRoleSetRequestType, req)
_, err = a.srv.raftApply(structs.ACLRoleSetRequestType, req)
if err != nil {
return fmt.Errorf("Failed to apply role upsert request: %v", err)
}
@ -1695,10 +1676,6 @@ func (a *ACL) RoleSet(args *structs.ACLRoleSetRequest, reply *structs.ACLRole) e
// Remove from the cache to prevent stale cache usage
a.srv.acls.cache.RemoveRole(role.ID)
if respErr, ok := resp.(error); ok {
return respErr
}
if _, role, err := a.srv.fsm.State().ACLRoleGetByID(nil, role.ID, &role.EnterpriseMeta); err == nil && role != nil {
*reply = *role
}
@ -1747,17 +1724,13 @@ func (a *ACL) RoleDelete(args *structs.ACLRoleDeleteRequest, reply *string) erro
RoleIDs: []string{args.RoleID},
}
resp, err := a.srv.raftApply(structs.ACLRoleDeleteRequestType, &req)
_, err = a.srv.raftApply(structs.ACLRoleDeleteRequestType, &req)
if err != nil {
return fmt.Errorf("Failed to apply role delete request: %v", err)
}
a.srv.acls.cache.RemoveRole(role.ID)
if respErr, ok := resp.(error); ok {
return respErr
}
*reply = role.Name
return nil
@ -2014,13 +1987,10 @@ func (a *ACL) BindingRuleSet(args *structs.ACLBindingRuleSetRequest, reply *stru
BindingRules: structs.ACLBindingRules{rule},
}
resp, err := a.srv.raftApply(structs.ACLBindingRuleSetRequestType, req)
_, err = a.srv.raftApply(structs.ACLBindingRuleSetRequestType, req)
if err != nil {
return fmt.Errorf("Failed to apply binding rule upsert request: %v", err)
}
if respErr, ok := resp.(error); ok {
return fmt.Errorf("Failed to apply binding rule upsert request: %v", respErr)
}
if _, rule, err := a.srv.fsm.State().ACLBindingRuleGetByID(nil, rule.ID, &rule.EnterpriseMeta); err == nil && rule != nil {
*reply = *rule
@ -2070,15 +2040,11 @@ func (a *ACL) BindingRuleDelete(args *structs.ACLBindingRuleDeleteRequest, reply
BindingRuleIDs: []string{args.BindingRuleID},
}
resp, err := a.srv.raftApply(structs.ACLBindingRuleDeleteRequestType, &req)
_, err = a.srv.raftApply(structs.ACLBindingRuleDeleteRequestType, &req)
if err != nil {
return fmt.Errorf("Failed to apply binding rule delete request: %v", err)
}
if respErr, ok := resp.(error); ok {
return respErr
}
*reply = true
return nil
@ -2266,15 +2232,11 @@ func (a *ACL) AuthMethodSet(args *structs.ACLAuthMethodSetRequest, reply *struct
AuthMethods: structs.ACLAuthMethods{method},
}
resp, err := a.srv.raftApply(structs.ACLAuthMethodSetRequestType, req)
_, err = a.srv.raftApply(structs.ACLAuthMethodSetRequestType, req)
if err != nil {
return fmt.Errorf("Failed to apply auth method upsert request: %v", err)
}
if respErr, ok := resp.(error); ok {
return respErr
}
if _, method, err := a.srv.fsm.State().ACLAuthMethodGetByName(nil, method.Name, &method.EnterpriseMeta); err == nil && method != nil {
*reply = *method
}
@ -2328,15 +2290,11 @@ func (a *ACL) AuthMethodDelete(args *structs.ACLAuthMethodDeleteRequest, reply *
EnterpriseMeta: args.EnterpriseMeta,
}
resp, err := a.srv.raftApply(structs.ACLAuthMethodDeleteRequestType, &req)
_, err = a.srv.raftApply(structs.ACLAuthMethodDeleteRequestType, &req)
if err != nil {
return fmt.Errorf("Failed to apply auth method delete request: %v", err)
}
if respErr, ok := resp.(error); ok {
return respErr
}
*reply = true
return nil
@ -2583,7 +2541,7 @@ func (a *ACL) Logout(args *structs.ACLLogoutRequest, reply *bool) error {
TokenIDs: []string{token.AccessorID},
}
resp, err := a.srv.raftApply(structs.ACLTokenDeleteRequestType, req)
_, err = a.srv.raftApply(structs.ACLTokenDeleteRequestType, req)
if err != nil {
return fmt.Errorf("Failed to apply token delete request: %v", err)
}
@ -2591,10 +2549,6 @@ func (a *ACL) Logout(args *structs.ACLLogoutRequest, reply *bool) error {
// Purge the identity from the cache to prevent using the previous definition of the identity
a.srv.acls.cache.RemoveIdentity(tokenSecretCacheID(token.SecretID))
if respErr, ok := resp.(error); ok {
return respErr
}
*reply = true
return nil

View File

@ -6,11 +6,12 @@ import (
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-memdb"
)
var ACLEndpointLegacySummaries = []prometheus.SummaryDefinition{
@ -68,9 +69,6 @@ func (a *ACL) Bootstrap(args *structs.DCSpecificRequest, reply *structs.ACL) err
return err
}
switch v := resp.(type) {
case error:
return v
case *structs.ACL:
*reply = *v
@ -143,11 +141,7 @@ func aclApplyInternal(srv *Server, args *structs.ACLRequest, reply *string) erro
// Apply the update
resp, err := srv.raftApply(structs.ACLRequestType, args)
if err != nil {
srv.logger.Error("Raft apply failed", "acl_op", args.Op, "error", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
return fmt.Errorf("raft apply failed: %w", err)
}
// Check if the return type is a string

View File

@ -86,14 +86,8 @@ func (r *aclTokenReplicator) DeleteLocalBatch(srv *Server, batch []string) error
TokenIDs: batch,
}
resp, err := srv.raftApply(structs.ACLTokenDeleteRequestType, &req)
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
_, err := srv.raftApply(structs.ACLTokenDeleteRequestType, &req)
return err
}
func (r *aclTokenReplicator) LenPendingUpdates() int {
@ -116,15 +110,8 @@ func (r *aclTokenReplicator) UpdateLocalBatch(ctx context.Context, srv *Server,
FromReplication: true,
}
resp, err := srv.raftApply(structs.ACLTokenSetRequestType, &req)
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
_, err := srv.raftApply(structs.ACLTokenSetRequestType, &req)
return err
}
///////////////////////
@ -199,14 +186,8 @@ func (r *aclPolicyReplicator) DeleteLocalBatch(srv *Server, batch []string) erro
PolicyIDs: batch,
}
resp, err := srv.raftApply(structs.ACLPolicyDeleteRequestType, &req)
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
_, err := srv.raftApply(structs.ACLPolicyDeleteRequestType, &req)
return err
}
func (r *aclPolicyReplicator) LenPendingUpdates() int {
@ -226,16 +207,8 @@ func (r *aclPolicyReplicator) UpdateLocalBatch(ctx context.Context, srv *Server,
Policies: r.updated[start:end],
}
resp, err := srv.raftApply(structs.ACLPolicySetRequestType, &req)
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
_, err := srv.raftApply(structs.ACLPolicySetRequestType, &req)
return err
}
////////////////////////////////
@ -334,16 +307,8 @@ func (r *aclRoleReplicator) DeleteLocalBatch(srv *Server, batch []string) error
RoleIDs: batch,
}
resp, err := srv.raftApply(structs.ACLRoleDeleteRequestType, &req)
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
_, err := srv.raftApply(structs.ACLRoleDeleteRequestType, &req)
return err
}
func (r *aclRoleReplicator) LenPendingUpdates() int {
@ -364,14 +329,6 @@ func (r *aclRoleReplicator) UpdateLocalBatch(ctx context.Context, srv *Server, s
AllowMissingLinks: true,
}
resp, err := srv.raftApply(structs.ACLRoleSetRequestType, &req)
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
_, err := srv.raftApply(structs.ACLRoleSetRequestType, &req)
return err
}

View File

@ -5,8 +5,9 @@ import (
"fmt"
"time"
"github.com/hashicorp/consul/agent/structs"
"golang.org/x/time/rate"
"github.com/hashicorp/consul/agent/structs"
)
func (s *Server) reapExpiredTokens(ctx context.Context) error {
@ -102,7 +103,7 @@ func (s *Server) reapExpiredACLTokens(local, global bool) (int, error) {
"amount", len(req.TokenIDs),
"locality", locality,
)
resp, err := s.raftApply(structs.ACLTokenDeleteRequestType, &req)
_, err = s.raftApply(structs.ACLTokenDeleteRequestType, &req)
if err != nil {
return 0, fmt.Errorf("Failed to apply token expiration deletions: %v", err)
}
@ -112,10 +113,6 @@ func (s *Server) reapExpiredACLTokens(local, global bool) (int, error) {
s.acls.cache.RemoveIdentity(tokenSecretCacheID(secretID))
}
if respErr, ok := resp.(error); ok {
return 0, respErr
}
return len(req.TokenIDs), nil
}

View File

@ -7,15 +7,16 @@ import (
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
bexpr "github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/types"
bexpr "github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
)
var CatalogCounters = []prometheus.CounterDefinition{
@ -210,14 +211,8 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error
}
}
resp, err := c.srv.raftApply(structs.RegisterRequestType, args)
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
_, err = c.srv.raftApply(structs.RegisterRequestType, args)
return err
}
// Deregister is used to remove a service registration for a given node.
@ -268,10 +263,8 @@ func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) e
}
if _, err := c.srv.raftApply(structs.DeregisterRequestType, args); err != nil {
return err
}
return nil
_, err = c.srv.raftApply(structs.DeregisterRequestType, args)
return err
}
// ListDatacenters is used to query for the list of known datacenters

View File

@ -92,9 +92,6 @@ func (c *ConfigEntry) Apply(args *structs.ConfigEntryRequest, reply *bool) error
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
if respBool, ok := resp.(bool); ok {
*reply = respBool
}
@ -296,14 +293,8 @@ func (c *ConfigEntry) Delete(args *structs.ConfigEntryRequest, reply *struct{})
}
args.Op = structs.ConfigEntryDelete
resp, err := c.srv.raftApply(structs.ConfigEntryRequestType, args)
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
_, err = c.srv.raftApply(structs.ConfigEntryRequestType, args)
return err
}
// ResolveServiceConfig

View File

@ -7,8 +7,9 @@ import (
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/structs"
)
func configSort(configs []structs.ConfigEntry) {
@ -97,15 +98,11 @@ func (s *Server) reconcileLocalConfig(ctx context.Context, configs []structs.Con
Entry: entry,
}
resp, err := s.raftApply(structs.ConfigEntryRequestType, &req)
_, err := s.raftApply(structs.ConfigEntryRequestType, &req)
if err != nil {
return false, fmt.Errorf("Failed to apply config %s: %v", op, err)
}
if respErr, ok := resp.(error); ok {
return false, fmt.Errorf("Failed to apply config %s: %v", op, respErr)
}
if i < len(configs)-1 {
select {
case <-ctx.Done():

View File

@ -16,13 +16,5 @@ func (c *consulCADelegate) State() *state.Store {
}
func (c *consulCADelegate) ApplyCARequest(req *structs.CARequest) (interface{}, error) {
resp, err := c.srv.raftApply(structs.ConnectCARequestType, req)
if err != nil {
return nil, err
}
if respErr, ok := resp.(error); ok {
return nil, respErr
}
return resp, nil
return c.srv.raftApply(structs.ConnectCARequestType, req)
}

View File

@ -6,13 +6,14 @@ import (
"sync"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
)
// Coordinate manages queries and updates for network coordinates.
@ -105,13 +106,10 @@ func (c *Coordinate) batchApplyUpdates() error {
t := structs.CoordinateBatchUpdateType | structs.IgnoreUnknownTypeFlag
slice := updates[start:end]
resp, err := c.srv.raftApply(t, slice)
_, err := c.srv.raftApply(t, slice)
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
}
return nil
}

View File

@ -7,10 +7,11 @@ import (
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
)
var FederationStateSummaries = []prometheus.SummaryDefinition{
@ -85,9 +86,6 @@ func (c *FederationState) Apply(args *structs.FederationStateRequest, reply *boo
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
if respBool, ok := resp.(bool); ok {
*reply = respBool
}

View File

@ -154,15 +154,11 @@ func (r *FederationStateReplicator) PerformDeletions(ctx context.Context, deleti
State: state,
}
resp, err := r.srv.raftApply(structs.FederationStateRequestType, &req)
_, err := r.srv.raftApply(structs.FederationStateRequestType, &req)
if err != nil {
return false, err
}
if respErr, ok := resp.(error); ok {
return false, respErr
}
if i < len(deletions)-1 {
select {
case <-ctx.Done():
@ -199,15 +195,11 @@ func (r *FederationStateReplicator) PerformUpdates(ctx context.Context, updatesR
State: state2,
}
resp, err := r.srv.raftApply(structs.FederationStateRequestType, &req)
_, err := r.srv.raftApply(structs.FederationStateRequestType, &req)
if err != nil {
return false, err
}
if respErr, ok := resp.(error); ok {
return false, respErr
}
if i < len(updates)-1 {
select {
case <-ctx.Done():

View File

@ -7,13 +7,14 @@ import (
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
)
var IntentionSummaries = []prometheus.SummaryDefinition{
@ -157,15 +158,8 @@ func (s *Intention) Apply(args *structs.IntentionRequest, reply *string) error {
args.Mutation = mut
args.Intention = nil
resp, err := s.srv.raftApply(structs.IntentionRequestType, args)
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
_, err = s.srv.raftApply(structs.IntentionRequestType, args)
return err
}
func (s *Intention) computeApplyChangesLegacyCreate(

View File

@ -7,12 +7,13 @@ import (
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
)
var KVSummaries = []prometheus.SummaryDefinition{
@ -122,11 +123,7 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
// Apply the update.
resp, err := k.srv.raftApply(structs.KVSRequestType, args)
if err != nil {
k.logger.Error("Raft apply failed", "error", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
return fmt.Errorf("raft apply failed: %w", err)
}
// Check if the return type is a bool.

View File

@ -12,13 +12,6 @@ import (
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
@ -26,6 +19,14 @@ import (
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"golang.org/x/time/rate"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/types"
)
var LeaderSummaries = []prometheus.SummaryDefinition{
@ -478,9 +479,6 @@ func (s *Server) initializeLegacyACL() error {
return fmt.Errorf("failed to initialize ACL bootstrap: %v", err)
}
switch v := resp.(type) {
case error:
return fmt.Errorf("failed to initialize ACL bootstrap: %v", v)
case bool:
if v {
s.logger.Info("ACL bootstrap enabled")
@ -766,14 +764,10 @@ func (s *Server) legacyACLTokenUpgrade(ctx context.Context) error {
req := &structs.ACLTokenBatchSetRequest{Tokens: newTokens, CAS: true}
resp, err := s.raftApply(structs.ACLTokenSetRequestType, req)
_, err = s.raftApply(structs.ACLTokenSetRequestType, req)
if err != nil {
s.logger.Error("failed to apply acl token upgrade batch", "error", err)
}
if err, ok := resp.(error); ok {
s.logger.Error("failed to apply acl token upgrade batch", "error", err)
}
}
}
@ -1088,12 +1082,7 @@ func (s *Server) bootstrapConfigEntries(entries []structs.ConfigEntry) error {
Entry: entry,
}
resp, err := s.raftApply(structs.ConfigEntryRequestType, &req)
if err == nil {
if respErr, ok := resp.(error); ok {
err = respErr
}
}
_, err := s.raftApply(structs.ConfigEntryRequestType, &req)
if err != nil {
return fmt.Errorf("Failed to apply configuration entry %q / %q: %v", entry.GetKind(), entry.GetName(), err)
}

View File

@ -135,15 +135,8 @@ func (s *Server) pruneCARoots() error {
args.Op = structs.CAOpSetRoots
args.Index = idx
args.Roots = newRoots
resp, err := s.raftApply(structs.ConnectCARequestType, args)
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
_, err = s.raftApply(structs.ConnectCARequestType, args)
return err
}
// retryLoopBackoff loops a given function indefinitely, backing off exponentially

View File

@ -628,15 +628,12 @@ func TestLeader_Vault_PrimaryCA_FixSigningKeyID_OnRestart(t *testing.T) {
activePrimaryRoot.SigningKeyID = primaryRootSigningKeyID
// Store the root cert in raft
resp, err := s1pre.raftApply(structs.ConnectCARequestType, &structs.CARequest{
_, err = s1pre.raftApply(structs.ConnectCARequestType, &structs.CARequest{
Op: structs.CAOpSetRoots,
Index: idx,
Roots: []*structs.CARoot{activePrimaryRoot},
})
require.NoError(t, err)
if respErr, ok := resp.(error); ok {
t.Fatalf("respErr: %v", respErr)
}
}
// Shutdown s1pre and restart it to trigger the secondary CA init to correct
@ -731,15 +728,12 @@ func TestLeader_SecondaryCA_FixSigningKeyID_via_IntermediateRefresh(t *testing.T
activeSecondaryRoot.SigningKeyID = secondaryRootSigningKeyID
// Store the root cert in raft
resp, err := s2pre.raftApply(structs.ConnectCARequestType, &structs.CARequest{
_, err = s2pre.raftApply(structs.ConnectCARequestType, &structs.CARequest{
Op: structs.CAOpSetRoots,
Index: idx,
Roots: []*structs.CARoot{activeSecondaryRoot},
})
require.NoError(t, err)
if respErr, ok := resp.(error); ok {
t.Fatalf("respErr: %v", respErr)
}
}
// Shutdown s2pre and restart it to trigger the secondary CA init to correct

View File

@ -5,9 +5,10 @@ import (
"fmt"
"time"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
)
const (
@ -117,13 +118,10 @@ func (s *Server) updateOurFederationState(curr *structs.FederationState) error {
if s.config.Datacenter == s.config.PrimaryDatacenter {
// We are the primary, so we can't do an RPC as we don't have a replication token.
resp, err := s.raftApply(structs.FederationStateRequestType, args)
_, err := s.raftApply(structs.FederationStateRequestType, args)
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
} else {
args.WriteRequest = structs.WriteRequest{
Token: s.tokens.ReplicationToken(),
@ -225,13 +223,10 @@ func (s *Server) pruneStaleFederationStates() error {
Datacenter: dc,
},
}
resp, err := s.raftApply(structs.FederationStateRequestType, &req)
_, err := s.raftApply(structs.FederationStateRequestType, &req)
if err != nil {
return fmt.Errorf("Failed to delete federation state %s: %v", dc, err)
}
if respErr, ok := resp.(error); ok {
return fmt.Errorf("Failed to delete federation state %s: %v", dc, respErr)
}
}
return nil

View File

@ -164,10 +164,8 @@ func (s *Server) legacyIntentionsMigrationCleanupPhase(quiet bool) error {
req := structs.IntentionRequest{
Op: structs.IntentionOpDeleteAll,
}
if resp, err := s.raftApply(structs.IntentionRequestType, req); err != nil {
if _, err := s.raftApply(structs.IntentionRequestType, req); err != nil {
return err
} else if respErr, ok := resp.(error); ok {
return respErr
}
// Bypass the serf component and jump right to the final state.
@ -410,9 +408,6 @@ func (s *Server) replicateLegacyIntentionsOnce(ctx context.Context, lastFetchInd
if err != nil {
return 0, false, err
}
if respErr, ok := resp.(error); ok {
return 0, false, respErr
}
if txnResp, ok := resp.(structs.TxnResponse); ok {
if len(txnResp.Errors) > 0 {

View File

@ -108,14 +108,8 @@ func TestLeader_ReplicateIntentions(t *testing.T) {
if req.Op != structs.IntentionOpDelete {
req2.Intention.Hash = req.Intention.Hash // not part of Clone
}
resp, err := s.raftApply(structs.IntentionRequestType, req2)
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
_, err := s.raftApply(structs.IntentionRequestType, req2)
return err
}
// Directly insert legacy intentions into raft in dc1.
@ -442,14 +436,11 @@ func TestLeader_LegacyIntentionMigration(t *testing.T) {
var retained []*structs.Intention
for _, ixn := range ixns {
ixn2 := *ixn
resp, err := s1pre.raftApply(structs.IntentionRequestType, &structs.IntentionRequest{
_, err := s1pre.raftApply(structs.IntentionRequestType, &structs.IntentionRequest{
Op: structs.IntentionOpCreate,
Intention: &ixn2,
})
require.NoError(t, err)
if respErr, ok := resp.(error); ok {
t.Fatalf("respErr: %v", respErr)
}
if _, present := ixn.Meta["unit-test-discarded"]; !present {
retained = append(retained, ixn)

View File

@ -3,10 +3,11 @@ package consul
import (
"fmt"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
autopilot "github.com/hashicorp/raft-autopilot"
"github.com/hashicorp/serf/serf"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
)
// AutopilotGetConfiguration is used to retrieve the current Autopilot configuration.
@ -62,11 +63,7 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe
// Apply the update
resp, err := op.srv.raftApply(structs.AutopilotRequestType, args)
if err != nil {
op.logger.Error("Raft apply failed", "error", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
return fmt.Errorf("raft apply failed: %w", err)
}
// Check if the return type is a bool.

View File

@ -7,13 +7,14 @@ import (
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
)
var PreparedQuerySummaries = []prometheus.SummaryDefinition{
@ -128,15 +129,10 @@ func (p *PreparedQuery) Apply(args *structs.PreparedQueryRequest, reply *string)
}
// Commit the query to the state store.
resp, err := p.srv.raftApply(structs.PreparedQueryRequestType, args)
_, err = p.srv.raftApply(structs.PreparedQueryRequestType, args)
if err != nil {
p.logger.Error("Raft apply failed", "error", err)
return err
return fmt.Errorf("raft apply failed: %w", err)
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
}

View File

@ -14,14 +14,6 @@ import (
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/wanfed"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
connlimit "github.com/hashicorp/go-connlimit"
"github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
@ -30,6 +22,15 @@ import (
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/raft"
"github.com/hashicorp/yamux"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/wanfed"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
)
var RPCCounters = []prometheus.CounterDefinition{
@ -534,9 +535,8 @@ func canRetry(args interface{}, err error) bool {
return true
}
// If we are chunking and it doesn't seem to have completed, try again
intErr, ok := args.(error)
if ok && strings.Contains(intErr.Error(), ErrChunkingResubmit.Error()) {
// If we are chunking and it doesn't seem to have completed, try again.
if err != nil && strings.Contains(err.Error(), ErrChunkingResubmit.Error()) {
return true
}
@ -729,28 +729,34 @@ func (s *Server) keyringRPCs(method string, args interface{}, dcs []string) (*st
type raftEncoder func(structs.MessageType, interface{}) ([]byte, error)
// raftApply is used to encode a message, run it through raft, and return
// the FSM response along with any errors
// raftApplyMsgpack encodes the msg using msgpack and calls raft.Apply. See
// raftApplyWithEncoder.
// Deprecated: use raftApplyMsgpack
func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, error) {
return s.raftApplyMsgpack(t, msg)
}
// raftApplyMsgpack will msgpack encode the request and then run it through raft,
// then return the FSM response along with any errors.
// raftApplyMsgpack encodes the msg using msgpack and calls raft.Apply. See
// raftApplyWithEncoder.
func (s *Server) raftApplyMsgpack(t structs.MessageType, msg interface{}) (interface{}, error) {
return s.raftApplyWithEncoder(t, msg, structs.Encode)
}
// raftApplyProtobuf will protobuf encode the request and then run it through raft,
// then return the FSM response along with any errors.
// raftApplyProtobuf encodes the msg using protobuf and calls raft.Apply. See
// raftApplyWithEncoder.
func (s *Server) raftApplyProtobuf(t structs.MessageType, msg interface{}) (interface{}, error) {
return s.raftApplyWithEncoder(t, msg, structs.EncodeProtoInterface)
}
// raftApplyWithEncoder is used to encode a message, run it through raft,
// and return the FSM response along with any errors. Unlike raftApply this
// takes the encoder to use as an argument.
func (s *Server) raftApplyWithEncoder(t structs.MessageType, msg interface{}, encoder raftEncoder) (interface{}, error) {
// raftApplyWithEncoder encodes a message, and then calls raft.Apply with the
// encoded message. Returns the FSM response along with any errors. If the
// FSM.Apply response is an error it will be returned as the error return
// value with a nil response.
func (s *Server) raftApplyWithEncoder(
t structs.MessageType,
msg interface{},
encoder raftEncoder,
) (response interface{}, err error) {
if encoder == nil {
return nil, fmt.Errorf("Failed to encode request: nil encoder")
}
@ -789,17 +795,19 @@ func (s *Server) raftApplyWithEncoder(t structs.MessageType, msg interface{}, en
// apply function. Downstream client code expects to see any error
// from the FSM (as opposed to the apply itself) and decide whether
// it can retry in the future's response.
return ErrChunkingResubmit, nil
return nil, ErrChunkingResubmit
}
// We expect that this conversion should always work
chunkedSuccess, ok := resp.(raftchunking.ChunkingSuccess)
if !ok {
return nil, errors.New("unknown type of response back from chunking FSM")
}
// Return the inner wrapped response
return chunkedSuccess.Response, nil
resp = chunkedSuccess.Response
}
if err, ok := resp.(error); ok {
return nil, err
}
return resp, nil
}

View File

@ -6,12 +6,13 @@ import (
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
)
var SessionEndpointSummaries = []prometheus.SummaryDefinition{
@ -147,8 +148,7 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
// Apply the update
resp, err := s.srv.raftApply(structs.SessionRequestType, args)
if err != nil {
s.logger.Error("Apply failed", "error", err)
return err
return fmt.Errorf("apply failed: %w", err)
}
if args.Op == structs.SessionCreate && args.Session.TTL != "" {
@ -160,10 +160,6 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
s.srv.clearSessionTimer(args.Session.ID)
}
if respErr, ok := resp.(error); ok {
return respErr
}
// Check if the return type is a string
if respString, ok := resp.(string); ok {
*reply = respString

View File

@ -22,15 +22,8 @@ func (s *Server) setSystemMetadataKey(key, val string) error {
Entry: &structs.SystemMetadataEntry{Key: key, Value: val},
}
resp, err := s.raftApply(structs.SystemMetadataRequestType, args)
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
_, err := s.raftApply(structs.SystemMetadataRequestType, args)
return err
}
func (s *Server) deleteSystemMetadataKey(key string) error {
@ -39,13 +32,6 @@ func (s *Server) deleteSystemMetadataKey(key string) error {
Entry: &structs.SystemMetadataEntry{Key: key},
}
resp, err := s.raftApply(structs.SystemMetadataRequestType, args)
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
_, err := s.raftApply(structs.SystemMetadataRequestType, args)
return err
}

View File

@ -6,10 +6,11 @@ import (
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
)
var TxnSummaries = []prometheus.SummaryDefinition{
@ -138,11 +139,7 @@ func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error
// Apply the update.
resp, err := t.srv.raftApply(structs.TxnRequestType, args)
if err != nil {
t.logger.Error("Raft apply failed", "error", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
return fmt.Errorf("raft apply failed: %w", err)
}
// Convert the return type. This should be a cheap copy since we are