rpc: extract rpcQueryTimeout method

This helps keep the logic in blockingQuery more focused. In the future we
may have a separate struct for RPC queries which may allow us to move this
off of Server.
This commit is contained in:
Daniel Nephin 2022-01-17 16:33:17 -05:00
parent 751bc2e7d3
commit a28d1268cb

View file

@ -938,19 +938,8 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s
return err
}
queryTimeout := queryOpts.GetMaxQueryTime()
// Restrict the max query time, and ensure there is always one.
if queryTimeout > s.config.MaxQueryTime {
queryTimeout = s.config.MaxQueryTime
} else if queryTimeout <= 0 {
queryTimeout = s.config.DefaultQueryTime
}
// Apply a small amount of jitter to the request.
queryTimeout += lib.RandomStagger(queryTimeout / structs.JitterFraction)
// wrap the base context with a deadline
ctx, cancel := context.WithTimeout(ctx, queryTimeout)
timeout := s.rpcQueryTimeout(queryOpts.GetMaxQueryTime())
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
// instrument blockingQueries
@ -1068,6 +1057,22 @@ func (s *Server) consistentRead() error {
return structs.ErrNotReadyForConsistentReads
}
// rpcQueryTimeout calculates the timeout for the query, ensures it is
// constrained to the configured limit, and adds jitter to prevent multiple
// blocking queries from all timing out at the same time.
func (s *Server) rpcQueryTimeout(queryTimeout time.Duration) time.Duration {
// Restrict the max query time, and ensure there is always one.
if queryTimeout > s.config.MaxQueryTime {
queryTimeout = s.config.MaxQueryTime
} else if queryTimeout <= 0 {
queryTimeout = s.config.DefaultQueryTime
}
// Apply a small amount of jitter to the request.
queryTimeout += lib.RandomStagger(queryTimeout / structs.JitterFraction)
return queryTimeout
}
// maskResultsFilteredByACLs blanks out the ResultsFilteredByACLs flag if the
// request is unauthenticated, to limit information leaking.
//