Handle FSM.Apply errors in raftApply
Previously we were inconsistently checking the response for errors. This PR moves the response-is-error check into raftApply, so that all callers can look at only the error response, instead of having to know that errors could come from two places. This should expose a few more errors that were previously hidden because in some calls to raftApply we were ignoring the response return value. Also handle errors more consistently. In some cases we would log the error before returning it. This can be very confusing because it can result in the same error being logged multiple times. Instead return a wrapped error.
This commit is contained in:
parent
d11823804d
commit
8654adfc53
|
@ -13,16 +13,17 @@ import (
|
|||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics/prometheus"
|
||||
"github.com/hashicorp/go-bexpr"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
uuid "github.com/hashicorp/go-uuid"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/authmethod"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/lib/template"
|
||||
"github.com/hashicorp/go-bexpr"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
uuid "github.com/hashicorp/go-uuid"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -250,15 +251,11 @@ func (a *ACL) BootstrapTokens(args *structs.DCSpecificRequest, reply *structs.AC
|
|||
|
||||
req.Token.SetHash(true)
|
||||
|
||||
resp, err := a.srv.raftApply(structs.ACLBootstrapRequestType, &req)
|
||||
_, err = a.srv.raftApply(structs.ACLBootstrapRequestType, &req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err, ok := resp.(error); ok {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, token, err := state.ACLTokenGetByAccessor(nil, accessor, structs.DefaultEnterpriseMeta()); err == nil {
|
||||
*reply = *token
|
||||
}
|
||||
|
@ -729,7 +726,7 @@ func (a *ACL) tokenSetInternal(args *structs.ACLTokenSetRequest, reply *structs.
|
|||
req.ProhibitUnprivileged = true
|
||||
}
|
||||
|
||||
resp, err := a.srv.raftApply(structs.ACLTokenSetRequestType, req)
|
||||
_, err = a.srv.raftApply(structs.ACLTokenSetRequestType, req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to apply token write request: %v", err)
|
||||
}
|
||||
|
@ -737,10 +734,6 @@ func (a *ACL) tokenSetInternal(args *structs.ACLTokenSetRequest, reply *structs.
|
|||
// Purge the identity from the cache to prevent using the previous definition of the identity
|
||||
a.srv.acls.cache.RemoveIdentity(tokenSecretCacheID(token.SecretID))
|
||||
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
// Don't check expiration times here as it doesn't really matter.
|
||||
if _, updatedToken, err := a.srv.fsm.State().ACLTokenGetByAccessor(nil, token.AccessorID, nil); err == nil && updatedToken != nil {
|
||||
*reply = *updatedToken
|
||||
|
@ -885,7 +878,7 @@ func (a *ACL) TokenDelete(args *structs.ACLTokenDeleteRequest, reply *string) er
|
|||
TokenIDs: []string{args.TokenID},
|
||||
}
|
||||
|
||||
resp, err := a.srv.raftApply(structs.ACLTokenDeleteRequestType, req)
|
||||
_, err = a.srv.raftApply(structs.ACLTokenDeleteRequestType, req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to apply token delete request: %v", err)
|
||||
}
|
||||
|
@ -893,10 +886,6 @@ func (a *ACL) TokenDelete(args *structs.ACLTokenDeleteRequest, reply *string) er
|
|||
// Purge the identity from the cache to prevent using the previous definition of the identity
|
||||
a.srv.acls.cache.RemoveIdentity(tokenSecretCacheID(token.SecretID))
|
||||
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
if reply != nil {
|
||||
*reply = token.AccessorID
|
||||
}
|
||||
|
@ -1218,7 +1207,7 @@ func (a *ACL) PolicySet(args *structs.ACLPolicySetRequest, reply *structs.ACLPol
|
|||
Policies: structs.ACLPolicies{policy},
|
||||
}
|
||||
|
||||
resp, err := a.srv.raftApply(structs.ACLPolicySetRequestType, req)
|
||||
_, err = a.srv.raftApply(structs.ACLPolicySetRequestType, req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to apply policy upsert request: %v", err)
|
||||
}
|
||||
|
@ -1226,10 +1215,6 @@ func (a *ACL) PolicySet(args *structs.ACLPolicySetRequest, reply *structs.ACLPol
|
|||
// Remove from the cache to prevent stale cache usage
|
||||
a.srv.acls.cache.RemovePolicy(policy.ID)
|
||||
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
if _, policy, err := a.srv.fsm.State().ACLPolicyGetByID(nil, policy.ID, &policy.EnterpriseMeta); err == nil && policy != nil {
|
||||
*reply = *policy
|
||||
}
|
||||
|
@ -1282,17 +1267,13 @@ func (a *ACL) PolicyDelete(args *structs.ACLPolicyDeleteRequest, reply *string)
|
|||
PolicyIDs: []string{args.PolicyID},
|
||||
}
|
||||
|
||||
resp, err := a.srv.raftApply(structs.ACLPolicyDeleteRequestType, &req)
|
||||
_, err = a.srv.raftApply(structs.ACLPolicyDeleteRequestType, &req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to apply policy delete request: %v", err)
|
||||
}
|
||||
|
||||
a.srv.acls.cache.RemovePolicy(policy.ID)
|
||||
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
*reply = policy.Name
|
||||
|
||||
return nil
|
||||
|
@ -1687,7 +1668,7 @@ func (a *ACL) RoleSet(args *structs.ACLRoleSetRequest, reply *structs.ACLRole) e
|
|||
Roles: structs.ACLRoles{role},
|
||||
}
|
||||
|
||||
resp, err := a.srv.raftApply(structs.ACLRoleSetRequestType, req)
|
||||
_, err = a.srv.raftApply(structs.ACLRoleSetRequestType, req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to apply role upsert request: %v", err)
|
||||
}
|
||||
|
@ -1695,10 +1676,6 @@ func (a *ACL) RoleSet(args *structs.ACLRoleSetRequest, reply *structs.ACLRole) e
|
|||
// Remove from the cache to prevent stale cache usage
|
||||
a.srv.acls.cache.RemoveRole(role.ID)
|
||||
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
if _, role, err := a.srv.fsm.State().ACLRoleGetByID(nil, role.ID, &role.EnterpriseMeta); err == nil && role != nil {
|
||||
*reply = *role
|
||||
}
|
||||
|
@ -1747,17 +1724,13 @@ func (a *ACL) RoleDelete(args *structs.ACLRoleDeleteRequest, reply *string) erro
|
|||
RoleIDs: []string{args.RoleID},
|
||||
}
|
||||
|
||||
resp, err := a.srv.raftApply(structs.ACLRoleDeleteRequestType, &req)
|
||||
_, err = a.srv.raftApply(structs.ACLRoleDeleteRequestType, &req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to apply role delete request: %v", err)
|
||||
}
|
||||
|
||||
a.srv.acls.cache.RemoveRole(role.ID)
|
||||
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
*reply = role.Name
|
||||
|
||||
return nil
|
||||
|
@ -2014,13 +1987,10 @@ func (a *ACL) BindingRuleSet(args *structs.ACLBindingRuleSetRequest, reply *stru
|
|||
BindingRules: structs.ACLBindingRules{rule},
|
||||
}
|
||||
|
||||
resp, err := a.srv.raftApply(structs.ACLBindingRuleSetRequestType, req)
|
||||
_, err = a.srv.raftApply(structs.ACLBindingRuleSetRequestType, req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to apply binding rule upsert request: %v", err)
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return fmt.Errorf("Failed to apply binding rule upsert request: %v", respErr)
|
||||
}
|
||||
|
||||
if _, rule, err := a.srv.fsm.State().ACLBindingRuleGetByID(nil, rule.ID, &rule.EnterpriseMeta); err == nil && rule != nil {
|
||||
*reply = *rule
|
||||
|
@ -2070,15 +2040,11 @@ func (a *ACL) BindingRuleDelete(args *structs.ACLBindingRuleDeleteRequest, reply
|
|||
BindingRuleIDs: []string{args.BindingRuleID},
|
||||
}
|
||||
|
||||
resp, err := a.srv.raftApply(structs.ACLBindingRuleDeleteRequestType, &req)
|
||||
_, err = a.srv.raftApply(structs.ACLBindingRuleDeleteRequestType, &req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to apply binding rule delete request: %v", err)
|
||||
}
|
||||
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
*reply = true
|
||||
|
||||
return nil
|
||||
|
@ -2266,15 +2232,11 @@ func (a *ACL) AuthMethodSet(args *structs.ACLAuthMethodSetRequest, reply *struct
|
|||
AuthMethods: structs.ACLAuthMethods{method},
|
||||
}
|
||||
|
||||
resp, err := a.srv.raftApply(structs.ACLAuthMethodSetRequestType, req)
|
||||
_, err = a.srv.raftApply(structs.ACLAuthMethodSetRequestType, req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to apply auth method upsert request: %v", err)
|
||||
}
|
||||
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
if _, method, err := a.srv.fsm.State().ACLAuthMethodGetByName(nil, method.Name, &method.EnterpriseMeta); err == nil && method != nil {
|
||||
*reply = *method
|
||||
}
|
||||
|
@ -2328,15 +2290,11 @@ func (a *ACL) AuthMethodDelete(args *structs.ACLAuthMethodDeleteRequest, reply *
|
|||
EnterpriseMeta: args.EnterpriseMeta,
|
||||
}
|
||||
|
||||
resp, err := a.srv.raftApply(structs.ACLAuthMethodDeleteRequestType, &req)
|
||||
_, err = a.srv.raftApply(structs.ACLAuthMethodDeleteRequestType, &req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to apply auth method delete request: %v", err)
|
||||
}
|
||||
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
*reply = true
|
||||
|
||||
return nil
|
||||
|
@ -2583,7 +2541,7 @@ func (a *ACL) Logout(args *structs.ACLLogoutRequest, reply *bool) error {
|
|||
TokenIDs: []string{token.AccessorID},
|
||||
}
|
||||
|
||||
resp, err := a.srv.raftApply(structs.ACLTokenDeleteRequestType, req)
|
||||
_, err = a.srv.raftApply(structs.ACLTokenDeleteRequestType, req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to apply token delete request: %v", err)
|
||||
}
|
||||
|
@ -2591,10 +2549,6 @@ func (a *ACL) Logout(args *structs.ACLLogoutRequest, reply *bool) error {
|
|||
// Purge the identity from the cache to prevent using the previous definition of the identity
|
||||
a.srv.acls.cache.RemoveIdentity(tokenSecretCacheID(token.SecretID))
|
||||
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
*reply = true
|
||||
|
||||
return nil
|
||||
|
|
|
@ -6,11 +6,12 @@ import (
|
|||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics/prometheus"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
var ACLEndpointLegacySummaries = []prometheus.SummaryDefinition{
|
||||
|
@ -68,9 +69,6 @@ func (a *ACL) Bootstrap(args *structs.DCSpecificRequest, reply *structs.ACL) err
|
|||
return err
|
||||
}
|
||||
switch v := resp.(type) {
|
||||
case error:
|
||||
return v
|
||||
|
||||
case *structs.ACL:
|
||||
*reply = *v
|
||||
|
||||
|
@ -143,11 +141,7 @@ func aclApplyInternal(srv *Server, args *structs.ACLRequest, reply *string) erro
|
|||
// Apply the update
|
||||
resp, err := srv.raftApply(structs.ACLRequestType, args)
|
||||
if err != nil {
|
||||
srv.logger.Error("Raft apply failed", "acl_op", args.Op, "error", err)
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
return fmt.Errorf("raft apply failed: %w", err)
|
||||
}
|
||||
|
||||
// Check if the return type is a string
|
||||
|
|
|
@ -86,15 +86,9 @@ func (r *aclTokenReplicator) DeleteLocalBatch(srv *Server, batch []string) error
|
|||
TokenIDs: batch,
|
||||
}
|
||||
|
||||
resp, err := srv.raftApply(structs.ACLTokenDeleteRequestType, &req)
|
||||
if err != nil {
|
||||
_, err := srv.raftApply(structs.ACLTokenDeleteRequestType, &req)
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *aclTokenReplicator) LenPendingUpdates() int {
|
||||
return len(r.updated)
|
||||
|
@ -116,16 +110,9 @@ func (r *aclTokenReplicator) UpdateLocalBatch(ctx context.Context, srv *Server,
|
|||
FromReplication: true,
|
||||
}
|
||||
|
||||
resp, err := srv.raftApply(structs.ACLTokenSetRequestType, &req)
|
||||
if err != nil {
|
||||
_, err := srv.raftApply(structs.ACLTokenSetRequestType, &req)
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
///////////////////////
|
||||
|
||||
|
@ -199,15 +186,9 @@ func (r *aclPolicyReplicator) DeleteLocalBatch(srv *Server, batch []string) erro
|
|||
PolicyIDs: batch,
|
||||
}
|
||||
|
||||
resp, err := srv.raftApply(structs.ACLPolicyDeleteRequestType, &req)
|
||||
if err != nil {
|
||||
_, err := srv.raftApply(structs.ACLPolicyDeleteRequestType, &req)
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *aclPolicyReplicator) LenPendingUpdates() int {
|
||||
return len(r.updated)
|
||||
|
@ -226,18 +207,10 @@ func (r *aclPolicyReplicator) UpdateLocalBatch(ctx context.Context, srv *Server,
|
|||
Policies: r.updated[start:end],
|
||||
}
|
||||
|
||||
resp, err := srv.raftApply(structs.ACLPolicySetRequestType, &req)
|
||||
if err != nil {
|
||||
_, err := srv.raftApply(structs.ACLPolicySetRequestType, &req)
|
||||
return err
|
||||
}
|
||||
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
////////////////////////////////
|
||||
|
||||
type aclRoleReplicator struct {
|
||||
|
@ -334,18 +307,10 @@ func (r *aclRoleReplicator) DeleteLocalBatch(srv *Server, batch []string) error
|
|||
RoleIDs: batch,
|
||||
}
|
||||
|
||||
resp, err := srv.raftApply(structs.ACLRoleDeleteRequestType, &req)
|
||||
if err != nil {
|
||||
_, err := srv.raftApply(structs.ACLRoleDeleteRequestType, &req)
|
||||
return err
|
||||
}
|
||||
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *aclRoleReplicator) LenPendingUpdates() int {
|
||||
return len(r.updated)
|
||||
}
|
||||
|
@ -364,14 +329,6 @@ func (r *aclRoleReplicator) UpdateLocalBatch(ctx context.Context, srv *Server, s
|
|||
AllowMissingLinks: true,
|
||||
}
|
||||
|
||||
resp, err := srv.raftApply(structs.ACLRoleSetRequestType, &req)
|
||||
if err != nil {
|
||||
_, err := srv.raftApply(structs.ACLRoleSetRequestType, &req)
|
||||
return err
|
||||
}
|
||||
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -5,8 +5,9 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
func (s *Server) reapExpiredTokens(ctx context.Context) error {
|
||||
|
@ -102,7 +103,7 @@ func (s *Server) reapExpiredACLTokens(local, global bool) (int, error) {
|
|||
"amount", len(req.TokenIDs),
|
||||
"locality", locality,
|
||||
)
|
||||
resp, err := s.raftApply(structs.ACLTokenDeleteRequestType, &req)
|
||||
_, err = s.raftApply(structs.ACLTokenDeleteRequestType, &req)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("Failed to apply token expiration deletions: %v", err)
|
||||
}
|
||||
|
@ -112,10 +113,6 @@ func (s *Server) reapExpiredACLTokens(local, global bool) (int, error) {
|
|||
s.acls.cache.RemoveIdentity(tokenSecretCacheID(secretID))
|
||||
}
|
||||
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return 0, respErr
|
||||
}
|
||||
|
||||
return len(req.TokenIDs), nil
|
||||
}
|
||||
|
||||
|
|
|
@ -7,15 +7,16 @@ import (
|
|||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics/prometheus"
|
||||
bexpr "github.com/hashicorp/go-bexpr"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/ipaddr"
|
||||
"github.com/hashicorp/consul/types"
|
||||
bexpr "github.com/hashicorp/go-bexpr"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
)
|
||||
|
||||
var CatalogCounters = []prometheus.CounterDefinition{
|
||||
|
@ -210,15 +211,9 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error
|
|||
}
|
||||
}
|
||||
|
||||
resp, err := c.srv.raftApply(structs.RegisterRequestType, args)
|
||||
if err != nil {
|
||||
_, err = c.srv.raftApply(structs.RegisterRequestType, args)
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Deregister is used to remove a service registration for a given node.
|
||||
func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) error {
|
||||
|
@ -268,11 +263,9 @@ func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) e
|
|||
|
||||
}
|
||||
|
||||
if _, err := c.srv.raftApply(structs.DeregisterRequestType, args); err != nil {
|
||||
_, err = c.srv.raftApply(structs.DeregisterRequestType, args)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListDatacenters is used to query for the list of known datacenters
|
||||
func (c *Catalog) ListDatacenters(args *structs.DatacentersRequest, reply *[]string) error {
|
||||
|
|
|
@ -92,9 +92,6 @@ func (c *ConfigEntry) Apply(args *structs.ConfigEntryRequest, reply *bool) error
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
if respBool, ok := resp.(bool); ok {
|
||||
*reply = respBool
|
||||
}
|
||||
|
@ -296,15 +293,9 @@ func (c *ConfigEntry) Delete(args *structs.ConfigEntryRequest, reply *struct{})
|
|||
}
|
||||
|
||||
args.Op = structs.ConfigEntryDelete
|
||||
resp, err := c.srv.raftApply(structs.ConfigEntryRequestType, args)
|
||||
if err != nil {
|
||||
_, err = c.srv.raftApply(structs.ConfigEntryRequestType, args)
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ResolveServiceConfig
|
||||
func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, reply *structs.ServiceConfigResponse) error {
|
||||
|
|
|
@ -7,8 +7,9 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
func configSort(configs []structs.ConfigEntry) {
|
||||
|
@ -97,15 +98,11 @@ func (s *Server) reconcileLocalConfig(ctx context.Context, configs []structs.Con
|
|||
Entry: entry,
|
||||
}
|
||||
|
||||
resp, err := s.raftApply(structs.ConfigEntryRequestType, &req)
|
||||
_, err := s.raftApply(structs.ConfigEntryRequestType, &req)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("Failed to apply config %s: %v", op, err)
|
||||
}
|
||||
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return false, fmt.Errorf("Failed to apply config %s: %v", op, respErr)
|
||||
}
|
||||
|
||||
if i < len(configs)-1 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
|
|
@ -16,13 +16,5 @@ func (c *consulCADelegate) State() *state.Store {
|
|||
}
|
||||
|
||||
func (c *consulCADelegate) ApplyCARequest(req *structs.CARequest) (interface{}, error) {
|
||||
resp, err := c.srv.raftApply(structs.ConnectCARequestType, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return nil, respErr
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
return c.srv.raftApply(structs.ConnectCARequestType, req)
|
||||
}
|
||||
|
|
|
@ -6,13 +6,14 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
// Coordinate manages queries and updates for network coordinates.
|
||||
|
@ -105,13 +106,10 @@ func (c *Coordinate) batchApplyUpdates() error {
|
|||
t := structs.CoordinateBatchUpdateType | structs.IgnoreUnknownTypeFlag
|
||||
|
||||
slice := updates[start:end]
|
||||
resp, err := c.srv.raftApply(t, slice)
|
||||
_, err := c.srv.raftApply(t, slice)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -7,10 +7,11 @@ import (
|
|||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics/prometheus"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
var FederationStateSummaries = []prometheus.SummaryDefinition{
|
||||
|
@ -85,9 +86,6 @@ func (c *FederationState) Apply(args *structs.FederationStateRequest, reply *boo
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
if respBool, ok := resp.(bool); ok {
|
||||
*reply = respBool
|
||||
}
|
||||
|
|
|
@ -154,15 +154,11 @@ func (r *FederationStateReplicator) PerformDeletions(ctx context.Context, deleti
|
|||
State: state,
|
||||
}
|
||||
|
||||
resp, err := r.srv.raftApply(structs.FederationStateRequestType, &req)
|
||||
_, err := r.srv.raftApply(structs.FederationStateRequestType, &req)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return false, respErr
|
||||
}
|
||||
|
||||
if i < len(deletions)-1 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
@ -199,15 +195,11 @@ func (r *FederationStateReplicator) PerformUpdates(ctx context.Context, updatesR
|
|||
State: state2,
|
||||
}
|
||||
|
||||
resp, err := r.srv.raftApply(structs.FederationStateRequestType, &req)
|
||||
_, err := r.srv.raftApply(structs.FederationStateRequestType, &req)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return false, respErr
|
||||
}
|
||||
|
||||
if i < len(updates)-1 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
|
|
@ -7,13 +7,14 @@ import (
|
|||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics/prometheus"
|
||||
"github.com/hashicorp/go-bexpr"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/go-bexpr"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
var IntentionSummaries = []prometheus.SummaryDefinition{
|
||||
|
@ -157,16 +158,9 @@ func (s *Intention) Apply(args *structs.IntentionRequest, reply *string) error {
|
|||
args.Mutation = mut
|
||||
args.Intention = nil
|
||||
|
||||
resp, err := s.srv.raftApply(structs.IntentionRequestType, args)
|
||||
if err != nil {
|
||||
_, err = s.srv.raftApply(structs.IntentionRequestType, args)
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Intention) computeApplyChangesLegacyCreate(
|
||||
accessorID string,
|
||||
|
|
|
@ -7,12 +7,13 @@ import (
|
|||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics/prometheus"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
var KVSummaries = []prometheus.SummaryDefinition{
|
||||
|
@ -122,11 +123,7 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
|
|||
// Apply the update.
|
||||
resp, err := k.srv.raftApply(structs.KVSRequestType, args)
|
||||
if err != nil {
|
||||
k.logger.Error("Raft apply failed", "error", err)
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
return fmt.Errorf("raft apply failed: %w", err)
|
||||
}
|
||||
|
||||
// Check if the return type is a bool.
|
||||
|
|
|
@ -12,13 +12,6 @@ import (
|
|||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics/prometheus"
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
|
@ -26,6 +19,14 @@ import (
|
|||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/consul/types"
|
||||
)
|
||||
|
||||
var LeaderSummaries = []prometheus.SummaryDefinition{
|
||||
|
@ -478,9 +479,6 @@ func (s *Server) initializeLegacyACL() error {
|
|||
return fmt.Errorf("failed to initialize ACL bootstrap: %v", err)
|
||||
}
|
||||
switch v := resp.(type) {
|
||||
case error:
|
||||
return fmt.Errorf("failed to initialize ACL bootstrap: %v", v)
|
||||
|
||||
case bool:
|
||||
if v {
|
||||
s.logger.Info("ACL bootstrap enabled")
|
||||
|
@ -766,14 +764,10 @@ func (s *Server) legacyACLTokenUpgrade(ctx context.Context) error {
|
|||
|
||||
req := &structs.ACLTokenBatchSetRequest{Tokens: newTokens, CAS: true}
|
||||
|
||||
resp, err := s.raftApply(structs.ACLTokenSetRequestType, req)
|
||||
_, err = s.raftApply(structs.ACLTokenSetRequestType, req)
|
||||
if err != nil {
|
||||
s.logger.Error("failed to apply acl token upgrade batch", "error", err)
|
||||
}
|
||||
|
||||
if err, ok := resp.(error); ok {
|
||||
s.logger.Error("failed to apply acl token upgrade batch", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1088,12 +1082,7 @@ func (s *Server) bootstrapConfigEntries(entries []structs.ConfigEntry) error {
|
|||
Entry: entry,
|
||||
}
|
||||
|
||||
resp, err := s.raftApply(structs.ConfigEntryRequestType, &req)
|
||||
if err == nil {
|
||||
if respErr, ok := resp.(error); ok {
|
||||
err = respErr
|
||||
}
|
||||
}
|
||||
_, err := s.raftApply(structs.ConfigEntryRequestType, &req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to apply configuration entry %q / %q: %v", entry.GetKind(), entry.GetName(), err)
|
||||
}
|
||||
|
|
|
@ -135,16 +135,9 @@ func (s *Server) pruneCARoots() error {
|
|||
args.Op = structs.CAOpSetRoots
|
||||
args.Index = idx
|
||||
args.Roots = newRoots
|
||||
resp, err := s.raftApply(structs.ConnectCARequestType, args)
|
||||
if err != nil {
|
||||
_, err = s.raftApply(structs.ConnectCARequestType, args)
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// retryLoopBackoff loops a given function indefinitely, backing off exponentially
|
||||
// upon errors up to a maximum of maxRetryBackoff seconds.
|
||||
|
|
|
@ -628,15 +628,12 @@ func TestLeader_Vault_PrimaryCA_FixSigningKeyID_OnRestart(t *testing.T) {
|
|||
activePrimaryRoot.SigningKeyID = primaryRootSigningKeyID
|
||||
|
||||
// Store the root cert in raft
|
||||
resp, err := s1pre.raftApply(structs.ConnectCARequestType, &structs.CARequest{
|
||||
_, err = s1pre.raftApply(structs.ConnectCARequestType, &structs.CARequest{
|
||||
Op: structs.CAOpSetRoots,
|
||||
Index: idx,
|
||||
Roots: []*structs.CARoot{activePrimaryRoot},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
if respErr, ok := resp.(error); ok {
|
||||
t.Fatalf("respErr: %v", respErr)
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown s1pre and restart it to trigger the secondary CA init to correct
|
||||
|
@ -731,15 +728,12 @@ func TestLeader_SecondaryCA_FixSigningKeyID_via_IntermediateRefresh(t *testing.T
|
|||
activeSecondaryRoot.SigningKeyID = secondaryRootSigningKeyID
|
||||
|
||||
// Store the root cert in raft
|
||||
resp, err := s2pre.raftApply(structs.ConnectCARequestType, &structs.CARequest{
|
||||
_, err = s2pre.raftApply(structs.ConnectCARequestType, &structs.CARequest{
|
||||
Op: structs.CAOpSetRoots,
|
||||
Index: idx,
|
||||
Roots: []*structs.CARoot{activeSecondaryRoot},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
if respErr, ok := resp.(error); ok {
|
||||
t.Fatalf("respErr: %v", respErr)
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown s2pre and restart it to trigger the secondary CA init to correct
|
||||
|
|
|
@ -5,9 +5,10 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -117,13 +118,10 @@ func (s *Server) updateOurFederationState(curr *structs.FederationState) error {
|
|||
|
||||
if s.config.Datacenter == s.config.PrimaryDatacenter {
|
||||
// We are the primary, so we can't do an RPC as we don't have a replication token.
|
||||
resp, err := s.raftApply(structs.FederationStateRequestType, args)
|
||||
_, err := s.raftApply(structs.FederationStateRequestType, args)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
} else {
|
||||
args.WriteRequest = structs.WriteRequest{
|
||||
Token: s.tokens.ReplicationToken(),
|
||||
|
@ -225,13 +223,10 @@ func (s *Server) pruneStaleFederationStates() error {
|
|||
Datacenter: dc,
|
||||
},
|
||||
}
|
||||
resp, err := s.raftApply(structs.FederationStateRequestType, &req)
|
||||
_, err := s.raftApply(structs.FederationStateRequestType, &req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to delete federation state %s: %v", dc, err)
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return fmt.Errorf("Failed to delete federation state %s: %v", dc, respErr)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -164,10 +164,8 @@ func (s *Server) legacyIntentionsMigrationCleanupPhase(quiet bool) error {
|
|||
req := structs.IntentionRequest{
|
||||
Op: structs.IntentionOpDeleteAll,
|
||||
}
|
||||
if resp, err := s.raftApply(structs.IntentionRequestType, req); err != nil {
|
||||
if _, err := s.raftApply(structs.IntentionRequestType, req); err != nil {
|
||||
return err
|
||||
} else if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
// Bypass the serf component and jump right to the final state.
|
||||
|
@ -410,9 +408,6 @@ func (s *Server) replicateLegacyIntentionsOnce(ctx context.Context, lastFetchInd
|
|||
if err != nil {
|
||||
return 0, false, err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return 0, false, respErr
|
||||
}
|
||||
|
||||
if txnResp, ok := resp.(structs.TxnResponse); ok {
|
||||
if len(txnResp.Errors) > 0 {
|
||||
|
|
|
@ -108,15 +108,9 @@ func TestLeader_ReplicateIntentions(t *testing.T) {
|
|||
if req.Op != structs.IntentionOpDelete {
|
||||
req2.Intention.Hash = req.Intention.Hash // not part of Clone
|
||||
}
|
||||
resp, err := s.raftApply(structs.IntentionRequestType, req2)
|
||||
if err != nil {
|
||||
_, err := s.raftApply(structs.IntentionRequestType, req2)
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Directly insert legacy intentions into raft in dc1.
|
||||
id := generateUUID()
|
||||
|
@ -442,14 +436,11 @@ func TestLeader_LegacyIntentionMigration(t *testing.T) {
|
|||
var retained []*structs.Intention
|
||||
for _, ixn := range ixns {
|
||||
ixn2 := *ixn
|
||||
resp, err := s1pre.raftApply(structs.IntentionRequestType, &structs.IntentionRequest{
|
||||
_, err := s1pre.raftApply(structs.IntentionRequestType, &structs.IntentionRequest{
|
||||
Op: structs.IntentionOpCreate,
|
||||
Intention: &ixn2,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
if respErr, ok := resp.(error); ok {
|
||||
t.Fatalf("respErr: %v", respErr)
|
||||
}
|
||||
|
||||
if _, present := ixn.Meta["unit-test-discarded"]; !present {
|
||||
retained = append(retained, ixn)
|
||||
|
|
|
@ -3,10 +3,11 @@ package consul
|
|||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
autopilot "github.com/hashicorp/raft-autopilot"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
// AutopilotGetConfiguration is used to retrieve the current Autopilot configuration.
|
||||
|
@ -62,11 +63,7 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe
|
|||
// Apply the update
|
||||
resp, err := op.srv.raftApply(structs.AutopilotRequestType, args)
|
||||
if err != nil {
|
||||
op.logger.Error("Raft apply failed", "error", err)
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
return fmt.Errorf("raft apply failed: %w", err)
|
||||
}
|
||||
|
||||
// Check if the return type is a bool.
|
||||
|
|
|
@ -7,13 +7,14 @@ import (
|
|||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics/prometheus"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
)
|
||||
|
||||
var PreparedQuerySummaries = []prometheus.SummaryDefinition{
|
||||
|
@ -128,15 +129,10 @@ func (p *PreparedQuery) Apply(args *structs.PreparedQueryRequest, reply *string)
|
|||
}
|
||||
|
||||
// Commit the query to the state store.
|
||||
resp, err := p.srv.raftApply(structs.PreparedQueryRequestType, args)
|
||||
_, err = p.srv.raftApply(structs.PreparedQueryRequestType, args)
|
||||
if err != nil {
|
||||
p.logger.Error("Raft apply failed", "error", err)
|
||||
return err
|
||||
return fmt.Errorf("raft apply failed: %w", err)
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -14,14 +14,6 @@ import (
|
|||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics/prometheus"
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/wanfed"
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
connlimit "github.com/hashicorp/go-connlimit"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
|
@ -30,6 +22,15 @@ import (
|
|||
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/yamux"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/wanfed"
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
)
|
||||
|
||||
var RPCCounters = []prometheus.CounterDefinition{
|
||||
|
@ -729,28 +730,34 @@ func (s *Server) keyringRPCs(method string, args interface{}, dcs []string) (*st
|
|||
|
||||
type raftEncoder func(structs.MessageType, interface{}) ([]byte, error)
|
||||
|
||||
// raftApply is used to encode a message, run it through raft, and return
|
||||
// the FSM response along with any errors
|
||||
// raftApplyMsgpack encodes the msg using msgpack and calls raft.Apply. See
|
||||
// raftApplyWithEncoder.
|
||||
// Deprecated: use raftApplyMsgpack
|
||||
func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, error) {
|
||||
return s.raftApplyMsgpack(t, msg)
|
||||
}
|
||||
|
||||
// raftApplyMsgpack will msgpack encode the request and then run it through raft,
|
||||
// then return the FSM response along with any errors.
|
||||
// raftApplyMsgpack encodes the msg using msgpack and calls raft.Apply. See
|
||||
// raftApplyWithEncoder.
|
||||
func (s *Server) raftApplyMsgpack(t structs.MessageType, msg interface{}) (interface{}, error) {
|
||||
return s.raftApplyWithEncoder(t, msg, structs.Encode)
|
||||
}
|
||||
|
||||
// raftApplyProtobuf will protobuf encode the request and then run it through raft,
|
||||
// then return the FSM response along with any errors.
|
||||
// raftApplyProtobuf encodes the msg using protobuf and calls raft.Apply. See
|
||||
// raftApplyWithEncoder.
|
||||
func (s *Server) raftApplyProtobuf(t structs.MessageType, msg interface{}) (interface{}, error) {
|
||||
return s.raftApplyWithEncoder(t, msg, structs.EncodeProtoInterface)
|
||||
}
|
||||
|
||||
// raftApplyWithEncoder is used to encode a message, run it through raft,
|
||||
// and return the FSM response along with any errors. Unlike raftApply this
|
||||
// takes the encoder to use as an argument.
|
||||
func (s *Server) raftApplyWithEncoder(t structs.MessageType, msg interface{}, encoder raftEncoder) (interface{}, error) {
|
||||
// raftApplyWithEncoder encodes a message, and then calls raft.Apply with the
|
||||
// encoded message. Returns the FSM response along with any errors. If the
|
||||
// FSM.Apply response is an error it will be returned as the error return
|
||||
// value with a nil response.
|
||||
func (s *Server) raftApplyWithEncoder(
|
||||
t structs.MessageType,
|
||||
msg interface{},
|
||||
encoder raftEncoder,
|
||||
) (response interface{}, err error) {
|
||||
if encoder == nil {
|
||||
return nil, fmt.Errorf("Failed to encode request: nil encoder")
|
||||
}
|
||||
|
@ -789,17 +796,19 @@ func (s *Server) raftApplyWithEncoder(t structs.MessageType, msg interface{}, en
|
|||
// apply function. Downstream client code expects to see any error
|
||||
// from the FSM (as opposed to the apply itself) and decide whether
|
||||
// it can retry in the future's response.
|
||||
return ErrChunkingResubmit, nil
|
||||
return nil, ErrChunkingResubmit
|
||||
}
|
||||
// We expect that this conversion should always work
|
||||
chunkedSuccess, ok := resp.(raftchunking.ChunkingSuccess)
|
||||
if !ok {
|
||||
return nil, errors.New("unknown type of response back from chunking FSM")
|
||||
}
|
||||
// Return the inner wrapped response
|
||||
return chunkedSuccess.Response, nil
|
||||
resp = chunkedSuccess.Response
|
||||
}
|
||||
|
||||
if err, ok := resp.(error); ok {
|
||||
return nil, err
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -6,12 +6,13 @@ import (
|
|||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics/prometheus"
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
var SessionEndpointSummaries = []prometheus.SummaryDefinition{
|
||||
|
@ -147,8 +148,7 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
|
|||
// Apply the update
|
||||
resp, err := s.srv.raftApply(structs.SessionRequestType, args)
|
||||
if err != nil {
|
||||
s.logger.Error("Apply failed", "error", err)
|
||||
return err
|
||||
return fmt.Errorf("apply failed: %w", err)
|
||||
}
|
||||
|
||||
if args.Op == structs.SessionCreate && args.Session.TTL != "" {
|
||||
|
@ -160,10 +160,6 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
|
|||
s.srv.clearSessionTimer(args.Session.ID)
|
||||
}
|
||||
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
// Check if the return type is a string
|
||||
if respString, ok := resp.(string); ok {
|
||||
*reply = respString
|
||||
|
|
|
@ -22,16 +22,9 @@ func (s *Server) setSystemMetadataKey(key, val string) error {
|
|||
Entry: &structs.SystemMetadataEntry{Key: key, Value: val},
|
||||
}
|
||||
|
||||
resp, err := s.raftApply(structs.SystemMetadataRequestType, args)
|
||||
if err != nil {
|
||||
_, err := s.raftApply(structs.SystemMetadataRequestType, args)
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) deleteSystemMetadataKey(key string) error {
|
||||
args := &structs.SystemMetadataRequest{
|
||||
|
@ -39,13 +32,6 @@ func (s *Server) deleteSystemMetadataKey(key string) error {
|
|||
Entry: &structs.SystemMetadataEntry{Key: key},
|
||||
}
|
||||
|
||||
resp, err := s.raftApply(structs.SystemMetadataRequestType, args)
|
||||
if err != nil {
|
||||
_, err := s.raftApply(structs.SystemMetadataRequestType, args)
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -6,10 +6,11 @@ import (
|
|||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics/prometheus"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
var TxnSummaries = []prometheus.SummaryDefinition{
|
||||
|
@ -138,11 +139,7 @@ func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error
|
|||
// Apply the update.
|
||||
resp, err := t.srv.raftApply(structs.TxnRequestType, args)
|
||||
if err != nil {
|
||||
t.logger.Error("Raft apply failed", "error", err)
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
return fmt.Errorf("raft apply failed: %w", err)
|
||||
}
|
||||
|
||||
// Convert the return type. This should be a cheap copy since we are
|
||||
|
|
Loading…
Reference in New Issue