open-nomad/vendor/github.com/sethgrid/pester/main.go

424 lines
11 KiB
Go
Raw Normal View History

2016-08-02 21:28:39 +00:00
package pester
// pester provides additional resiliency over the standard http client methods by
// allowing you to control concurrency, retries, and a backoff strategy.
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"math"
"math/rand"
"net/http"
"net/url"
"sync"
"time"
)
// Client wraps the http client and exposes all the functionality of the http.Client.
// Additionally, Client provides pester specific values for handling resiliency.
type Client struct {
// wrap it to provide access to http built ins
hc *http.Client
Transport http.RoundTripper
CheckRedirect func(req *http.Request, via []*http.Request) error
Jar http.CookieJar
Timeout time.Duration
// pester specific
Concurrency int
MaxRetries int
Backoff BackoffStrategy
KeepLog bool
SuccessReqNum int
SuccessRetryNum int
wg *sync.WaitGroup
sync.Mutex
ErrLog []ErrEntry
}
// ErrEntry is used to provide the LogString() data and is populated
// each time an error happens if KeepLog is set.
// ErrEntry.Retry is deprecated in favor of ErrEntry.Attempt
type ErrEntry struct {
Time time.Time
Method string
URL string
Verb string
Request int
Retry int
Attempt int
Err error
}
// result simplifies the channel communication for concurrent request handling
type result struct {
resp *http.Response
err error
req int
retry int
}
// params represents all the params needed to run http client calls and pester errors
type params struct {
method string
verb string
req *http.Request
url string
bodyType string
body io.Reader
data url.Values
}
// New constructs a new DefaultClient with sensible default values
func New() *Client {
return &Client{
Concurrency: DefaultClient.Concurrency,
MaxRetries: DefaultClient.MaxRetries,
Backoff: DefaultClient.Backoff,
ErrLog: DefaultClient.ErrLog,
wg: &sync.WaitGroup{},
}
}
// NewExtendedClient allows you to pass in an http.Client that is previously set up
// and extends it to have Pester's features of concurrency and retries.
func NewExtendedClient(hc *http.Client) *Client {
c := New()
c.hc = hc
return c
}
// BackoffStrategy is used to determine how long a retry request should wait until attempted
type BackoffStrategy func(retry int) time.Duration
// DefaultClient provides sensible defaults
var DefaultClient = &Client{Concurrency: 1, MaxRetries: 3, Backoff: DefaultBackoff, ErrLog: []ErrEntry{}}
// DefaultBackoff always returns 1 second
func DefaultBackoff(_ int) time.Duration {
return 1 * time.Second
}
// ExponentialBackoff returns ever increasing backoffs by a power of 2
func ExponentialBackoff(i int) time.Duration {
return time.Duration(math.Pow(2, float64(i))) * time.Second
}
// ExponentialJitterBackoff returns ever increasing backoffs by a power of 2
// with +/- 0-33% to prevent sychronized reuqests.
func ExponentialJitterBackoff(i int) time.Duration {
return jitter(int(math.Pow(2, float64(i))))
}
// LinearBackoff returns increasing durations, each a second longer than the last
func LinearBackoff(i int) time.Duration {
return time.Duration(i) * time.Second
}
// LinearJitterBackoff returns increasing durations, each a second longer than the last
// with +/- 0-33% to prevent sychronized reuqests.
func LinearJitterBackoff(i int) time.Duration {
return jitter(i)
}
// jitter keeps the +/- 0-33% logic in one place
func jitter(i int) time.Duration {
ms := i * 1000
maxJitter := ms / 3
rand.Seed(time.Now().Unix())
jitter := rand.Intn(maxJitter + 1)
if rand.Intn(2) == 1 {
ms = ms + jitter
} else {
ms = ms - jitter
}
// a jitter of 0 messes up the time.Tick chan
if ms <= 0 {
ms = 1
}
return time.Duration(ms) * time.Millisecond
}
// Wait blocks until all pester requests have returned
// Probably not that useful outside of testing.
func (c *Client) Wait() {
c.wg.Wait()
}
// pester provides all the logic of retries, concurrency, backoff, and logging
func (c *Client) pester(p params) (*http.Response, error) {
resultCh := make(chan result)
multiplexCh := make(chan result)
finishCh := make(chan struct{})
// track all requests that go out so we can close the late listener routine that closes late incoming response bodies
totalSentRequests := &sync.WaitGroup{}
totalSentRequests.Add(1)
defer totalSentRequests.Done()
allRequestsBackCh := make(chan struct{})
go func() {
totalSentRequests.Wait()
close(allRequestsBackCh)
}()
// GET calls should be idempotent and can make use
// of concurrency. Other verbs can mutate and should not
// make use of the concurrency feature
concurrency := c.Concurrency
if p.verb != "GET" {
concurrency = 1
}
c.Lock()
if c.hc == nil {
c.hc = &http.Client{}
c.hc.Transport = c.Transport
c.hc.CheckRedirect = c.CheckRedirect
c.hc.Jar = c.Jar
c.hc.Timeout = c.Timeout
}
c.Unlock()
// re-create the http client so we can leverage the std lib
httpClient := http.Client{
Transport: c.hc.Transport,
CheckRedirect: c.hc.CheckRedirect,
Jar: c.hc.Jar,
Timeout: c.hc.Timeout,
}
// if we have a request body, we need to save it for later
var originalRequestBody []byte
var originalBody []byte
var err error
if p.req != nil && p.req.Body != nil {
originalRequestBody, err = ioutil.ReadAll(p.req.Body)
if err != nil {
return &http.Response{}, errors.New("error reading request body")
}
p.req.Body.Close()
}
if p.body != nil {
originalBody, err = ioutil.ReadAll(p.body)
if err != nil {
return &http.Response{}, errors.New("error reading body")
}
}
AttemptLimit := c.MaxRetries
if AttemptLimit <= 0 {
AttemptLimit = 1
}
for req := 0; req < concurrency; req++ {
c.wg.Add(1)
totalSentRequests.Add(1)
go func(n int, p params) {
defer c.wg.Done()
defer totalSentRequests.Done()
var err error
for i := 1; i <= AttemptLimit; i++ {
c.wg.Add(1)
defer c.wg.Done()
select {
case <-finishCh:
return
default:
}
resp := &http.Response{}
// rehydrate the body (it is drained each read)
if len(originalRequestBody) > 0 {
p.req.Body = ioutil.NopCloser(bytes.NewBuffer(originalRequestBody))
}
if len(originalBody) > 0 {
p.body = bytes.NewBuffer(originalBody)
}
// route the calls
switch p.method {
case "Do":
resp, err = httpClient.Do(p.req)
case "Get":
resp, err = httpClient.Get(p.url)
case "Head":
resp, err = httpClient.Head(p.url)
case "Post":
resp, err = httpClient.Post(p.url, p.bodyType, p.body)
case "PostForm":
resp, err = httpClient.PostForm(p.url, p.data)
}
// Early return if we have a valid result
// Only retry (ie, continue the loop) on 5xx status codes
if err == nil && resp.StatusCode < 500 {
multiplexCh <- result{resp: resp, err: err, req: n, retry: i}
return
}
c.log(ErrEntry{
Time: time.Now(),
Method: p.method,
Verb: p.verb,
URL: p.url,
Request: n,
Retry: i + 1, // would remove, but would break backward compatibility
Attempt: i,
Err: err,
})
// if it is the last iteration, grab the result (which is an error at this point)
if i == AttemptLimit {
multiplexCh <- result{resp: resp, err: err}
return
}
// if we are retrying, we should close this response body to free the fd
if resp != nil {
resp.Body.Close()
}
// prevent a 0 from causing the tick to block, pass additional microsecond
<-time.Tick(c.Backoff(i) + 1*time.Microsecond)
}
}(req, p)
}
// spin off the go routine so it can continually listen in on late results and close the response bodies
go func() {
gotFirstResult := false
for {
select {
case res := <-multiplexCh:
if !gotFirstResult {
gotFirstResult = true
close(finishCh)
resultCh <- res
} else if res.resp != nil {
// we only return one result to the caller; close all other response bodies that come back
// drain the body before close as to not prevent keepalive. see https://gist.github.com/mholt/eba0f2cc96658be0f717
io.Copy(ioutil.Discard, res.resp.Body)
res.resp.Body.Close()
}
case <-allRequestsBackCh:
// don't leave this goroutine running
return
}
}
}()
select {
case res := <-resultCh:
c.Lock()
defer c.Unlock()
c.SuccessReqNum = res.req
c.SuccessRetryNum = res.retry
return res.resp, res.err
}
}
// LogString provides a string representation of the errors the client has seen
func (c *Client) LogString() string {
c.Lock()
defer c.Unlock()
var res string
for _, e := range c.ErrLog {
res += fmt.Sprintf("%d %s [%s] %s request-%d retry-%d error: %s\n",
e.Time.Unix(), e.Method, e.Verb, e.URL, e.Request, e.Retry, e.Err)
}
return res
}
// LogErrCount is a helper method used primarily for test validation
func (c *Client) LogErrCount() int {
c.Lock()
defer c.Unlock()
return len(c.ErrLog)
}
// EmbedHTTPClient allows you to extend an existing Pester client with an
// underlying http.Client, such as https://godoc.org/golang.org/x/oauth2/google#DefaultClient
func (c *Client) EmbedHTTPClient(hc *http.Client) {
c.hc = hc
}
func (c *Client) log(e ErrEntry) {
if c.KeepLog {
c.Lock()
c.ErrLog = append(c.ErrLog, e)
c.Unlock()
}
}
// Do provides the same functionality as http.Client.Do
func (c *Client) Do(req *http.Request) (resp *http.Response, err error) {
return c.pester(params{method: "Do", req: req, verb: req.Method, url: req.URL.String()})
}
// Get provides the same functionality as http.Client.Get
func (c *Client) Get(url string) (resp *http.Response, err error) {
return c.pester(params{method: "Get", url: url, verb: "GET"})
}
// Head provides the same functionality as http.Client.Head
func (c *Client) Head(url string) (resp *http.Response, err error) {
return c.pester(params{method: "Head", url: url, verb: "HEAD"})
}
// Post provides the same functionality as http.Client.Post
func (c *Client) Post(url string, bodyType string, body io.Reader) (resp *http.Response, err error) {
return c.pester(params{method: "Post", url: url, bodyType: bodyType, body: body, verb: "POST"})
}
// PostForm provides the same functionality as http.Client.PostForm
func (c *Client) PostForm(url string, data url.Values) (resp *http.Response, err error) {
return c.pester(params{method: "PostForm", url: url, data: data, verb: "POST"})
}
////////////////////////////////////////
// Provide self-constructing variants //
////////////////////////////////////////
// Do provides the same functionality as http.Client.Do and creates its own constructor
func Do(req *http.Request) (resp *http.Response, err error) {
c := New()
return c.Do(req)
}
// Get provides the same functionality as http.Client.Get and creates its own constructor
func Get(url string) (resp *http.Response, err error) {
c := New()
return c.Get(url)
}
// Head provides the same functionality as http.Client.Head and creates its own constructor
func Head(url string) (resp *http.Response, err error) {
c := New()
return c.Head(url)
}
// Post provides the same functionality as http.Client.Post and creates its own constructor
func Post(url string, bodyType string, body io.Reader) (resp *http.Response, err error) {
c := New()
return c.Post(url, bodyType, body)
}
// PostForm provides the same functionality as http.Client.PostForm and creates its own constructor
func PostForm(url string, data url.Values) (resp *http.Response, err error) {
c := New()
return c.PostForm(url, data)
}