open-vault/api/client.go

1394 lines
39 KiB
Go
Raw Normal View History

2015-03-04 21:10:10 +00:00
package api
import (
2018-05-11 14:42:06 +00:00
"context"
"crypto/hmac"
"crypto/sha256"
"crypto/tls"
"encoding/base64"
"encoding/hex"
2015-04-20 18:30:35 +00:00
"fmt"
"net"
2015-03-04 21:10:10 +00:00
"net/http"
2015-03-09 18:38:50 +00:00
"net/url"
"os"
"path"
"strconv"
"strings"
"sync"
"time"
"unicode"
2017-11-06 17:06:19 +00:00
"github.com/hashicorp/errwrap"
cleanhttp "github.com/hashicorp/go-cleanhttp"
retryablehttp "github.com/hashicorp/go-retryablehttp"
rootcerts "github.com/hashicorp/go-rootcerts"
"github.com/hashicorp/go-secure-stdlib/parseutil"
"golang.org/x/net/http2"
"golang.org/x/time/rate"
"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/hashicorp/vault/sdk/helper/strutil"
"github.com/hashicorp/vault/sdk/logical"
2015-03-04 21:10:10 +00:00
)
const (
EnvVaultAddress = "VAULT_ADDR"
EnvVaultAgentAddr = "VAULT_AGENT_ADDR"
EnvVaultCACert = "VAULT_CACERT"
EnvVaultCAPath = "VAULT_CAPATH"
EnvVaultClientCert = "VAULT_CLIENT_CERT"
EnvVaultClientKey = "VAULT_CLIENT_KEY"
EnvVaultClientTimeout = "VAULT_CLIENT_TIMEOUT"
EnvVaultSRVLookup = "VAULT_SRV_LOOKUP"
EnvVaultSkipVerify = "VAULT_SKIP_VERIFY"
EnvVaultNamespace = "VAULT_NAMESPACE"
EnvVaultTLSServerName = "VAULT_TLS_SERVER_NAME"
EnvVaultWrapTTL = "VAULT_WRAP_TTL"
EnvVaultMaxRetries = "VAULT_MAX_RETRIES"
EnvVaultToken = "VAULT_TOKEN"
EnvVaultMFA = "VAULT_MFA"
EnvRateLimit = "VAULT_RATE_LIMIT"
EnvHTTPProxy = "VAULT_HTTP_PROXY"
HeaderIndex = "X-Vault-Index"
)
// Deprecated values
const (
EnvVaultAgentAddress = "VAULT_AGENT_ADDR"
EnvVaultInsecure = "VAULT_SKIP_VERIFY"
)
// WrappingLookupFunc is a function that, given an HTTP verb and a path,
// returns an optional string duration to be used for response wrapping (e.g.
// "15s", or simply "15"). The path will not begin with "/v1/" or "v1/" or "/",
// however, end-of-path forward slashes are not trimmed, so must match your
// called path precisely. Response wrapping will only be used when the return
// value is not the empty string.
type WrappingLookupFunc func(operation, path string) string
2015-03-04 21:10:10 +00:00
// Config is used to configure the creation of the client.
type Config struct {
modifyLock sync.RWMutex
2015-03-04 21:10:10 +00:00
// Address is the address of the Vault server. This should be a complete
// URL such as "http://vault.example.com". If you need a custom SSL
// cert or want to enable insecure mode, you need to specify a custom
// HttpClient.
Address string
// AgentAddress is the address of the local Vault agent. This should be a
// complete URL such as "http://vault.example.com".
AgentAddress string
// HttpClient is the HTTP client to use. Vault sets sane defaults for the
// http.Client and its associated http.Transport created in DefaultConfig.
// If you must modify Vault's defaults, it is suggested that you start with
// that client and modify as needed rather than start with an empty client
// (or http.DefaultClient).
2015-03-04 21:10:10 +00:00
HttpClient *http.Client
// MinRetryWait controls the minimum time to wait before retrying when a 5xx
// error occurs. Defaults to 1000 milliseconds.
MinRetryWait time.Duration
// MaxRetryWait controls the maximum time to wait before retrying when a 5xx
// error occurs. Defaults to 1500 milliseconds.
MaxRetryWait time.Duration
2018-05-09 22:24:41 +00:00
// MaxRetries controls the maximum number of times to retry when a 5xx
// error occurs. Set to 0 to disable retrying. Defaults to 2 (for a total
// of three tries).
MaxRetries int
// Timeout is for setting custom timeout parameter in the HttpClient
Timeout time.Duration
2017-11-06 17:06:19 +00:00
// If there is an error when creating the configuration, this will be the
// error
Error error
2018-05-09 21:44:53 +00:00
// The Backoff function to use; a default is used if not provided
Backoff retryablehttp.Backoff
2018-05-11 14:42:06 +00:00
// The CheckRetry function to use; a default is used if not provided
CheckRetry retryablehttp.CheckRetry
// Logger is the leveled logger to provide to the retryable HTTP client.
Logger retryablehttp.LeveledLogger
2018-05-11 14:42:06 +00:00
// Limiter is the rate limiter used by the client.
// If this pointer is nil, then there will be no limit set.
// In contrast, if this pointer is set, even to an empty struct,
// then that limiter will be used. Note that an empty Limiter
// is equivalent blocking all events.
Limiter *rate.Limiter
// OutputCurlString causes the actual request to return an error of type
// *OutputStringError. Type asserting the error message will allow
// fetching a cURL-compatible string for the operation.
//
// Note: It is not thread-safe to set this and make concurrent requests
// with the same client. Cloning a client will not clone this value.
OutputCurlString bool
// SRVLookup enables the client to lookup the host through DNS SRV lookup
SRVLookup bool
// CloneHeaders ensures that the source client's headers are copied to
// its clone.
CloneHeaders bool
// ReadYourWrites ensures isolated read-after-write semantics by
// providing discovered cluster replication states in each request.
// The shared state is automatically propagated to all Client clones.
//
// Note: Careful consideration should be made prior to enabling this setting
// since there will be a performance penalty paid upon each request.
// This feature requires Enterprise server-side.
ReadYourWrites bool
2015-03-04 21:10:10 +00:00
}
2016-08-02 20:17:45 +00:00
// TLSConfig contains the parameters needed to configure TLS on the HTTP client
// used to communicate with Vault.
type TLSConfig struct {
// CACert is the path to a PEM-encoded CA cert file to use to verify the
// Vault server SSL certificate.
CACert string
// CAPath is the path to a directory of PEM-encoded CA cert files to verify
// the Vault server SSL certificate.
CAPath string
// ClientCert is the path to the certificate for Vault communication
ClientCert string
// ClientKey is the path to the private key for Vault communication
ClientKey string
// TLSServerName, if set, is used to set the SNI host when connecting via
// TLS.
TLSServerName string
// Insecure enables or disables SSL verification
Insecure bool
}
2015-03-04 21:10:10 +00:00
// DefaultConfig returns a default configuration for the client. It is
// safe to modify the return value of this function.
2015-04-23 15:45:37 +00:00
//
// The default Address is https://127.0.0.1:8200, but this can be overridden by
2015-04-23 15:46:22 +00:00
// setting the `VAULT_ADDR` environment variable.
//
// If an error is encountered, this will return nil.
2015-04-23 15:13:52 +00:00
func DefaultConfig() *Config {
config := &Config{
Address: "https://127.0.0.1:8200",
HttpClient: cleanhttp.DefaultPooledClient(),
Timeout: time.Second * 60,
MinRetryWait: time.Millisecond * 1000,
MaxRetryWait: time.Millisecond * 1500,
MaxRetries: 2,
Backoff: retryablehttp.LinearJitterBackoff,
2015-08-22 00:36:19 +00:00
}
transport := config.HttpClient.Transport.(*http.Transport)
transport.TLSHandshakeTimeout = 10 * time.Second
transport.TLSClientConfig = &tls.Config{
MinVersion: tls.VersionTLS12,
}
if err := http2.ConfigureTransport(transport); err != nil {
2017-11-06 17:06:19 +00:00
config.Error = err
return config
}
if err := config.ReadEnvironment(); err != nil {
2017-11-06 17:06:19 +00:00
config.Error = err
return config
}
2015-08-22 00:36:19 +00:00
// Ensure redirects are not automatically followed
// Note that this is sane for the API client as it has its own
// redirect handling logic (and thus also for command/meta),
// but in e.g. http_test actual redirect handling is necessary
config.HttpClient.CheckRedirect = func(req *http.Request, via []*http.Request) error {
// Returning this value causes the Go net library to not close the
2018-05-09 21:44:53 +00:00
// response body and to nil out the error. Otherwise retry clients may
// try three times on every redirect because it sees an error from this
// function (to prevent redirects) passing through to it.
return http.ErrUseLastResponse
}
2015-03-04 21:10:10 +00:00
return config
}
// ConfigureTLS takes a set of TLS configurations and applies those to the
// HTTP client.
2016-08-02 20:17:45 +00:00
func (c *Config) ConfigureTLS(t *TLSConfig) error {
if c.HttpClient == nil {
c.HttpClient = DefaultConfig().HttpClient
}
2017-11-06 17:06:19 +00:00
clientTLSConfig := c.HttpClient.Transport.(*http.Transport).TLSClientConfig
2016-08-02 20:17:45 +00:00
var clientCert tls.Certificate
foundClientCert := false
2017-11-06 17:06:19 +00:00
switch {
case t.ClientCert != "" && t.ClientKey != "":
var err error
clientCert, err = tls.LoadX509KeyPair(t.ClientCert, t.ClientKey)
if err != nil {
return err
}
2017-11-06 17:06:19 +00:00
foundClientCert = true
case t.ClientCert != "" || t.ClientKey != "":
return fmt.Errorf("both client cert and client key must be provided")
}
2017-11-06 17:06:19 +00:00
if t.CACert != "" || t.CAPath != "" {
rootConfig := &rootcerts.Config{
CAFile: t.CACert,
CAPath: t.CAPath,
}
if err := rootcerts.ConfigureTLS(clientTLSConfig, rootConfig); err != nil {
return err
}
}
2017-11-06 17:06:19 +00:00
if t.Insecure {
clientTLSConfig.InsecureSkipVerify = true
}
if foundClientCert {
// We use this function to ignore the server's preferential list of
// CAs, otherwise any CA used for the cert auth backend must be in the
// server's CA pool
clientTLSConfig.GetClientCertificate = func(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
return &clientCert, nil
}
}
2017-11-06 17:06:19 +00:00
2016-08-02 20:17:45 +00:00
if t.TLSServerName != "" {
clientTLSConfig.ServerName = t.TLSServerName
}
return nil
}
// ReadEnvironment reads configuration information from the environment. If
// there is an error, no configuration value is updated.
func (c *Config) ReadEnvironment() error {
var envAddress string
var envAgentAddress string
var envCACert string
var envCAPath string
var envClientCert string
var envClientKey string
var envClientTimeout time.Duration
2016-08-02 20:17:45 +00:00
var envInsecure bool
var envTLSServerName string
var envMaxRetries *uint64
var envSRVLookup bool
2018-05-11 14:42:06 +00:00
var limit *rate.Limiter
var envHTTPProxy string
// Parse the environment variables
if v := os.Getenv(EnvVaultAddress); v != "" {
envAddress = v
}
if v := os.Getenv(EnvVaultAgentAddr); v != "" {
envAgentAddress = v
} else if v := os.Getenv(EnvVaultAgentAddress); v != "" {
envAgentAddress = v
}
if v := os.Getenv(EnvVaultMaxRetries); v != "" {
maxRetries, err := strconv.ParseUint(v, 10, 32)
if err != nil {
return err
}
envMaxRetries = &maxRetries
}
if v := os.Getenv(EnvVaultCACert); v != "" {
envCACert = v
}
if v := os.Getenv(EnvVaultCAPath); v != "" {
envCAPath = v
}
if v := os.Getenv(EnvVaultClientCert); v != "" {
envClientCert = v
}
if v := os.Getenv(EnvVaultClientKey); v != "" {
envClientKey = v
}
2018-05-11 14:42:06 +00:00
if v := os.Getenv(EnvRateLimit); v != "" {
rateLimit, burstLimit, err := parseRateLimit(v)
if err != nil {
return err
}
limit = rate.NewLimiter(rate.Limit(rateLimit), burstLimit)
}
if t := os.Getenv(EnvVaultClientTimeout); t != "" {
clientTimeout, err := parseutil.ParseDurationSecond(t)
if err != nil {
return fmt.Errorf("could not parse %q", EnvVaultClientTimeout)
}
envClientTimeout = clientTimeout
}
if v := os.Getenv(EnvVaultSkipVerify); v != "" {
var err error
envInsecure, err = strconv.ParseBool(v)
if err != nil {
return fmt.Errorf("could not parse VAULT_SKIP_VERIFY")
}
} else if v := os.Getenv(EnvVaultInsecure); v != "" {
var err error
envInsecure, err = strconv.ParseBool(v)
if err != nil {
return fmt.Errorf("could not parse VAULT_INSECURE")
}
}
if v := os.Getenv(EnvVaultSRVLookup); v != "" {
var err error
envSRVLookup, err = strconv.ParseBool(v)
if err != nil {
return fmt.Errorf("could not parse %s", EnvVaultSRVLookup)
}
}
if v := os.Getenv(EnvVaultTLSServerName); v != "" {
envTLSServerName = v
}
if v := os.Getenv(EnvHTTPProxy); v != "" {
envHTTPProxy = v
}
// Configure the HTTP clients TLS configuration.
2016-08-02 20:17:45 +00:00
t := &TLSConfig{
CACert: envCACert,
CAPath: envCAPath,
ClientCert: envClientCert,
ClientKey: envClientKey,
TLSServerName: envTLSServerName,
Insecure: envInsecure,
}
c.modifyLock.Lock()
defer c.modifyLock.Unlock()
c.SRVLookup = envSRVLookup
2018-05-11 14:42:06 +00:00
c.Limiter = limit
2016-08-02 20:17:45 +00:00
if err := c.ConfigureTLS(t); err != nil {
return err
}
if envAddress != "" {
c.Address = envAddress
}
if envAgentAddress != "" {
c.AgentAddress = envAgentAddress
}
if envMaxRetries != nil {
2018-05-09 22:24:41 +00:00
c.MaxRetries = int(*envMaxRetries)
}
if envClientTimeout != 0 {
c.Timeout = envClientTimeout
}
if envHTTPProxy != "" {
url, err := url.Parse(envHTTPProxy)
if err != nil {
return err
}
transport := c.HttpClient.Transport.(*http.Transport)
transport.Proxy = http.ProxyURL(url)
}
return nil
}
2018-05-11 14:42:06 +00:00
func parseRateLimit(val string) (rate float64, burst int, err error) {
_, err = fmt.Sscanf(val, "%f:%d", &rate, &burst)
if err != nil {
rate, err = strconv.ParseFloat(val, 64)
if err != nil {
err = fmt.Errorf("%v was provided but incorrectly formatted", EnvRateLimit)
}
burst = int(rate)
}
return rate, burst, err
}
// Client is the client to the Vault API. Create a client with NewClient.
2015-03-04 21:10:10 +00:00
type Client struct {
modifyLock sync.RWMutex
addr *url.URL
config *Config
token string
headers http.Header
wrappingLookupFunc WrappingLookupFunc
mfaCreds []string
policyOverride bool
requestCallbacks []RequestCallback
responseCallbacks []ResponseCallback
replicationStateStore *replicationStateStore
2017-10-23 20:52:56 +00:00
}
2015-03-04 21:10:10 +00:00
// NewClient returns a new client for the given configuration.
2015-04-23 15:45:37 +00:00
//
// If the configuration is nil, Vault will use configuration from
// DefaultConfig(), which is the recommended starting configuration.
//
2015-04-23 15:45:37 +00:00
// If the environment variable `VAULT_TOKEN` is present, the token will be
// automatically added to the client. Otherwise, you must manually call
// `SetToken()`.
2015-04-23 15:13:52 +00:00
func NewClient(c *Config) (*Client, error) {
def := DefaultConfig()
if def == nil {
return nil, fmt.Errorf("could not create/read default configuration")
}
2017-11-06 17:06:19 +00:00
if def.Error != nil {
return nil, errwrap.Wrapf("error encountered setting up default configuration: {{err}}", def.Error)
}
if c == nil {
c = def
}
c.modifyLock.Lock()
defer c.modifyLock.Unlock()
if c.MinRetryWait == 0 {
c.MinRetryWait = def.MinRetryWait
}
if c.MaxRetryWait == 0 {
c.MaxRetryWait = def.MaxRetryWait
}
2019-02-15 18:40:03 +00:00
if c.HttpClient == nil {
c.HttpClient = def.HttpClient
}
if c.HttpClient.Transport == nil {
c.HttpClient.Transport = def.HttpClient.Transport
}
address := c.Address
if c.AgentAddress != "" {
address = c.AgentAddress
}
2019-03-05 20:20:16 +00:00
u, err := url.Parse(address)
if err != nil {
return nil, err
}
if strings.HasPrefix(address, "unix://") {
socket := strings.TrimPrefix(address, "unix://")
2019-02-15 18:40:03 +00:00
transport := c.HttpClient.Transport.(*http.Transport)
transport.DialContext = func(context.Context, string, string) (net.Conn, error) {
return net.Dial("unix", socket)
}
2019-03-05 20:20:16 +00:00
// Since the address points to a unix domain socket, the scheme in the
// *URL would be set to `unix`. The *URL in the client is expected to
// be pointing to the protocol used in the application layer and not to
// the transport layer. Hence, setting the fields accordingly.
u.Scheme = "http"
u.Host = socket
u.Path = ""
2015-03-09 18:38:50 +00:00
}
client := &Client{
addr: u,
config: c,
headers: make(http.Header),
}
if c.ReadYourWrites {
client.replicationStateStore = &replicationStateStore{}
}
// Add the VaultRequest SSRF protection header
client.headers[consts.RequestHeaderName] = []string{"true"}
if token := os.Getenv(EnvVaultToken); token != "" {
client.token = token
}
if namespace := os.Getenv(EnvVaultNamespace); namespace != "" {
client.setNamespace(namespace)
}
return client, nil
2015-03-04 21:10:10 +00:00
}
2015-03-09 18:38:50 +00:00
func (c *Client) CloneConfig() *Config {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
newConfig := DefaultConfig()
newConfig.Address = c.config.Address
newConfig.AgentAddress = c.config.AgentAddress
newConfig.MinRetryWait = c.config.MinRetryWait
newConfig.MaxRetryWait = c.config.MaxRetryWait
newConfig.MaxRetries = c.config.MaxRetries
newConfig.Timeout = c.config.Timeout
newConfig.Backoff = c.config.Backoff
newConfig.CheckRetry = c.config.CheckRetry
newConfig.Logger = c.config.Logger
newConfig.Limiter = c.config.Limiter
newConfig.OutputCurlString = c.config.OutputCurlString
newConfig.SRVLookup = c.config.SRVLookup
newConfig.CloneHeaders = c.config.CloneHeaders
newConfig.ReadYourWrites = c.config.ReadYourWrites
// we specifically want a _copy_ of the client here, not a pointer to the original one
newClient := *c.config.HttpClient
newConfig.HttpClient = &newClient
return newConfig
}
// Sets the address of Vault in the client. The format of address should be
// "<Scheme>://<Host>:<Port>". Setting this on a client will override the
// value of VAULT_ADDR environment variable.
func (c *Client) SetAddress(addr string) error {
c.modifyLock.Lock()
defer c.modifyLock.Unlock()
parsedAddr, err := url.Parse(addr)
if err != nil {
return errwrap.Wrapf("failed to set address: {{err}}", err)
}
c.config.modifyLock.Lock()
c.config.Address = addr
c.config.modifyLock.Unlock()
c.addr = parsedAddr
return nil
}
// Address returns the Vault URL the client is configured to connect to
func (c *Client) Address() string {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
return c.addr.String()
}
2018-05-11 14:42:06 +00:00
// SetLimiter will set the rate limiter for this client.
// This method is thread-safe.
// rateLimit and burst are specified according to https://godoc.org/golang.org/x/time/rate#NewLimiter
func (c *Client) SetLimiter(rateLimit float64, burst int) {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
2018-05-11 14:42:06 +00:00
c.config.modifyLock.Lock()
defer c.config.modifyLock.Unlock()
2018-05-11 14:42:06 +00:00
c.config.Limiter = rate.NewLimiter(rate.Limit(rateLimit), burst)
}
func (c *Client) Limiter() *rate.Limiter {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
c.config.modifyLock.RLock()
defer c.config.modifyLock.RUnlock()
return c.config.Limiter
}
// SetMinRetryWait sets the minimum time to wait before retrying in the case of certain errors.
func (c *Client) SetMinRetryWait(retryWait time.Duration) {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
c.config.modifyLock.Lock()
defer c.config.modifyLock.Unlock()
c.config.MinRetryWait = retryWait
}
func (c *Client) MinRetryWait() time.Duration {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
c.config.modifyLock.RLock()
defer c.config.modifyLock.RUnlock()
return c.config.MinRetryWait
}
// SetMaxRetryWait sets the maximum time to wait before retrying in the case of certain errors.
func (c *Client) SetMaxRetryWait(retryWait time.Duration) {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
c.config.modifyLock.Lock()
defer c.config.modifyLock.Unlock()
c.config.MaxRetryWait = retryWait
}
func (c *Client) MaxRetryWait() time.Duration {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
c.config.modifyLock.RLock()
defer c.config.modifyLock.RUnlock()
return c.config.MaxRetryWait
}
2017-03-01 17:23:54 +00:00
// SetMaxRetries sets the number of retries that will be used in the case of certain errors
func (c *Client) SetMaxRetries(retries int) {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
c.config.modifyLock.Lock()
defer c.config.modifyLock.Unlock()
2017-03-01 17:23:54 +00:00
c.config.MaxRetries = retries
}
func (c *Client) MaxRetries() int {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
c.config.modifyLock.RLock()
defer c.config.modifyLock.RUnlock()
return c.config.MaxRetries
}
func (c *Client) SetSRVLookup(srv bool) {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
c.config.modifyLock.Lock()
defer c.config.modifyLock.Unlock()
c.config.SRVLookup = srv
}
func (c *Client) SRVLookup() bool {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
c.config.modifyLock.RLock()
defer c.config.modifyLock.RUnlock()
return c.config.SRVLookup
}
// SetCheckRetry sets the CheckRetry function to be used for future requests.
func (c *Client) SetCheckRetry(checkRetry retryablehttp.CheckRetry) {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
c.config.modifyLock.Lock()
defer c.config.modifyLock.Unlock()
c.config.CheckRetry = checkRetry
}
func (c *Client) CheckRetry() retryablehttp.CheckRetry {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
c.config.modifyLock.RLock()
defer c.config.modifyLock.RUnlock()
return c.config.CheckRetry
}
// SetClientTimeout sets the client request timeout
func (c *Client) SetClientTimeout(timeout time.Duration) {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
c.config.modifyLock.Lock()
defer c.config.modifyLock.Unlock()
c.config.Timeout = timeout
}
func (c *Client) ClientTimeout() time.Duration {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
c.config.modifyLock.RLock()
defer c.config.modifyLock.RUnlock()
return c.config.Timeout
}
func (c *Client) OutputCurlString() bool {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
c.config.modifyLock.RLock()
defer c.config.modifyLock.RUnlock()
return c.config.OutputCurlString
}
func (c *Client) SetOutputCurlString(curl bool) {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
c.config.modifyLock.Lock()
defer c.config.modifyLock.Unlock()
c.config.OutputCurlString = curl
}
Fix response wrapping from K/V version 2 (#4511) This takes place in two parts, since working on this exposed an issue with response wrapping when there is a raw body set. The changes are (in diff order): * A CurrentWrappingLookupFunc has been added to return the current value. This is necessary for the lookahead call since we don't want the lookahead call to be wrapped. * Support for unwrapping < 0.6.2 tokens via the API/CLI has been removed, because we now have backends returning 404s with data and can't rely on the 404 trick. These can still be read manually via cubbyhole/response. * KV preflight version request now ensures that its calls is not wrapped, and restores any given function after. * When responding with a raw body, instead of always base64-decoding a string value and erroring on failure, on failure we assume that it simply wasn't a base64-encoded value and use it as is. * A test that fails on master and works now that ensures that raw body responses that are wrapped and then unwrapped return the expected values. * A flag for response data that indicates to the wrapping handling that the data contained therein is already JSON decoded (more later). * RespondWithStatusCode now defaults to a string so that the value is HMAC'd during audit. The function always JSON encodes the body, so before now it was always returning []byte which would skip HMACing. We don't know what's in the data, so this is a "better safe than sorry" issue. If different behavior is needed, backends can always manually populate the data instead of relying on the helper function. * We now check unwrapped data after unwrapping to see if there were raw flags. If so, we try to detect whether the value can be unbase64'd. The reason is that if it can it was probably originally a []byte and shouldn't be audit HMAC'd; if not, it was probably originally a string and should be. In either case, we then set the value as the raw body and hit the flag indicating that it's already been JSON decoded so not to try again before auditing. Doing it this way ensures the right typing. * There is now a check to see if the data coming from unwrapping is already JSON decoded and if so the decoding is skipped before setting the audit response.
2018-05-10 19:40:03 +00:00
// CurrentWrappingLookupFunc sets a lookup function that returns desired wrap TTLs
// for a given operation and path.
Fix response wrapping from K/V version 2 (#4511) This takes place in two parts, since working on this exposed an issue with response wrapping when there is a raw body set. The changes are (in diff order): * A CurrentWrappingLookupFunc has been added to return the current value. This is necessary for the lookahead call since we don't want the lookahead call to be wrapped. * Support for unwrapping < 0.6.2 tokens via the API/CLI has been removed, because we now have backends returning 404s with data and can't rely on the 404 trick. These can still be read manually via cubbyhole/response. * KV preflight version request now ensures that its calls is not wrapped, and restores any given function after. * When responding with a raw body, instead of always base64-decoding a string value and erroring on failure, on failure we assume that it simply wasn't a base64-encoded value and use it as is. * A test that fails on master and works now that ensures that raw body responses that are wrapped and then unwrapped return the expected values. * A flag for response data that indicates to the wrapping handling that the data contained therein is already JSON decoded (more later). * RespondWithStatusCode now defaults to a string so that the value is HMAC'd during audit. The function always JSON encodes the body, so before now it was always returning []byte which would skip HMACing. We don't know what's in the data, so this is a "better safe than sorry" issue. If different behavior is needed, backends can always manually populate the data instead of relying on the helper function. * We now check unwrapped data after unwrapping to see if there were raw flags. If so, we try to detect whether the value can be unbase64'd. The reason is that if it can it was probably originally a []byte and shouldn't be audit HMAC'd; if not, it was probably originally a string and should be. In either case, we then set the value as the raw body and hit the flag indicating that it's already been JSON decoded so not to try again before auditing. Doing it this way ensures the right typing. * There is now a check to see if the data coming from unwrapping is already JSON decoded and if so the decoding is skipped before setting the audit response.
2018-05-10 19:40:03 +00:00
func (c *Client) CurrentWrappingLookupFunc() WrappingLookupFunc {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
return c.wrappingLookupFunc
}
// SetWrappingLookupFunc sets a lookup function that returns desired wrap TTLs
// for a given operation and path.
func (c *Client) SetWrappingLookupFunc(lookupFunc WrappingLookupFunc) {
c.modifyLock.Lock()
defer c.modifyLock.Unlock()
c.wrappingLookupFunc = lookupFunc
}
// SetMFACreds sets the MFA credentials supplied either via the environment
// variable or via the command line.
func (c *Client) SetMFACreds(creds []string) {
c.modifyLock.Lock()
defer c.modifyLock.Unlock()
c.mfaCreds = creds
}
2018-08-09 22:29:03 +00:00
// SetNamespace sets the namespace supplied either via the environment
// variable or via the command line.
func (c *Client) SetNamespace(namespace string) {
c.modifyLock.Lock()
defer c.modifyLock.Unlock()
c.setNamespace(namespace)
}
2018-08-09 22:29:03 +00:00
func (c *Client) setNamespace(namespace string) {
2018-08-09 22:29:03 +00:00
if c.headers == nil {
c.headers = make(http.Header)
}
2018-08-22 18:37:40 +00:00
c.headers.Set(consts.NamespaceHeaderName, namespace)
2018-08-09 22:29:03 +00:00
}
2015-03-11 22:42:08 +00:00
// Token returns the access token being used by this client. It will
// return the empty string if there is no token set.
func (c *Client) Token() string {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
2015-08-22 00:36:19 +00:00
return c.token
2015-03-11 22:42:08 +00:00
}
2015-03-31 04:20:23 +00:00
// SetToken sets the token directly. This won't perform any auth
2015-09-03 14:36:59 +00:00
// verification, it simply sets the token properly for future requests.
2015-03-31 04:20:23 +00:00
func (c *Client) SetToken(v string) {
c.modifyLock.Lock()
defer c.modifyLock.Unlock()
2015-08-22 00:36:19 +00:00
c.token = v
2015-03-31 04:20:23 +00:00
}
2015-09-03 14:36:59 +00:00
// ClearToken deletes the token if it is set or does nothing otherwise.
2015-03-11 22:42:08 +00:00
func (c *Client) ClearToken() {
c.modifyLock.Lock()
defer c.modifyLock.Unlock()
2015-08-22 00:36:19 +00:00
c.token = ""
2015-03-11 22:42:08 +00:00
}
2018-08-09 22:29:03 +00:00
// Headers gets the current set of headers used for requests. This returns a
// copy; to modify it call AddHeader or SetHeaders.
2018-08-09 22:29:03 +00:00
func (c *Client) Headers() http.Header {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
if c.headers == nil {
return nil
}
ret := make(http.Header)
for k, v := range c.headers {
for _, val := range v {
ret[k] = append(ret[k], val)
}
}
return ret
}
// AddHeader allows a single header key/value pair to be added
// in a race-safe fashion.
func (c *Client) AddHeader(key, value string) {
c.modifyLock.Lock()
defer c.modifyLock.Unlock()
c.headers.Add(key, value)
}
// SetHeaders clears all previous headers and uses only the given
// ones going forward.
func (c *Client) SetHeaders(headers http.Header) {
c.modifyLock.Lock()
defer c.modifyLock.Unlock()
c.headers = headers
}
2018-05-09 21:44:53 +00:00
// SetBackoff sets the backoff function to be used for future requests.
func (c *Client) SetBackoff(backoff retryablehttp.Backoff) {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
2018-05-09 21:44:53 +00:00
c.config.modifyLock.Lock()
defer c.config.modifyLock.Unlock()
c.config.Backoff = backoff
}
func (c *Client) SetLogger(logger retryablehttp.LeveledLogger) {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
c.config.modifyLock.Lock()
defer c.config.modifyLock.Unlock()
c.config.Logger = logger
}
// SetCloneHeaders to allow headers to be copied whenever the client is cloned.
func (c *Client) SetCloneHeaders(cloneHeaders bool) {
c.modifyLock.Lock()
defer c.modifyLock.Unlock()
c.config.modifyLock.Lock()
defer c.config.modifyLock.Unlock()
c.config.CloneHeaders = cloneHeaders
}
// CloneHeaders gets the configured CloneHeaders value.
func (c *Client) CloneHeaders() bool {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
c.config.modifyLock.RLock()
defer c.config.modifyLock.RUnlock()
return c.config.CloneHeaders
}
// SetReadYourWrites to prevent reading stale cluster replication state.
func (c *Client) SetReadYourWrites(preventStaleReads bool) {
c.modifyLock.Lock()
defer c.modifyLock.Unlock()
c.config.modifyLock.Lock()
defer c.config.modifyLock.Unlock()
if preventStaleReads && c.replicationStateStore == nil {
c.replicationStateStore = &replicationStateStore{}
} else {
c.replicationStateStore = nil
}
c.config.ReadYourWrites = preventStaleReads
}
// ReadYourWrites gets the configured value of ReadYourWrites
func (c *Client) ReadYourWrites() bool {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
c.config.modifyLock.RLock()
defer c.config.modifyLock.RUnlock()
return c.config.ReadYourWrites
}
// Clone creates a new client with the same configuration. Note that the same
// underlying http.Client is used; modifying the client from more than one
// goroutine at once may not be safe, so modify the client as needed and then
// clone.
//
// Also, only the client's config is currently copied; this means items not in
// the api.Config struct, such as policy override and wrapping function
// behavior, must currently then be set as desired on the new client.
func (c *Client) Clone() (*Client, error) {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
config := c.config
config.modifyLock.RLock()
defer config.modifyLock.RUnlock()
newConfig := &Config{
Address: config.Address,
HttpClient: config.HttpClient,
MinRetryWait: config.MinRetryWait,
MaxRetryWait: config.MaxRetryWait,
MaxRetries: config.MaxRetries,
Timeout: config.Timeout,
Backoff: config.Backoff,
CheckRetry: config.CheckRetry,
Logger: config.Logger,
Limiter: config.Limiter,
OutputCurlString: config.OutputCurlString,
AgentAddress: config.AgentAddress,
SRVLookup: config.SRVLookup,
CloneHeaders: config.CloneHeaders,
ReadYourWrites: config.ReadYourWrites,
}
client, err := NewClient(newConfig)
if err != nil {
return nil, err
}
if config.CloneHeaders {
client.SetHeaders(c.Headers().Clone())
}
client.replicationStateStore = c.replicationStateStore
return client, nil
}
2017-10-23 20:52:56 +00:00
// SetPolicyOverride sets whether requests should be sent with the policy
// override flag to request overriding soft-mandatory Sentinel policies (both
// RGPs and EGPs)
func (c *Client) SetPolicyOverride(override bool) {
c.modifyLock.Lock()
defer c.modifyLock.Unlock()
2017-10-23 20:52:56 +00:00
c.policyOverride = override
}
2015-03-09 18:38:50 +00:00
// NewRequest creates a new raw request object to query the Vault server
// configured for this client. This is an advanced method and generally
// doesn't need to be called externally.
func (c *Client) NewRequest(method, requestPath string) *Request {
c.modifyLock.RLock()
addr := c.addr
token := c.token
mfaCreds := c.mfaCreds
wrappingLookupFunc := c.wrappingLookupFunc
policyOverride := c.policyOverride
c.modifyLock.RUnlock()
host := addr.Host
// if SRV records exist (see https://tools.ietf.org/html/draft-andrews-http-srv-02), lookup the SRV
// record and take the highest match; this is not designed for high-availability, just discovery
// Internet Draft specifies that the SRV record is ignored if a port is given
if addr.Port() == "" && c.config.SRVLookup {
_, addrs, err := net.LookupSRV("http", "tcp", addr.Hostname())
if err == nil && len(addrs) > 0 {
host = fmt.Sprintf("%s:%d", addrs[0].Target, addrs[0].Port)
}
}
2015-08-22 00:36:19 +00:00
req := &Request{
2015-03-09 18:38:50 +00:00
Method: method,
URL: &url.URL{
User: addr.User,
Scheme: addr.Scheme,
Host: host,
Path: path.Join(addr.Path, requestPath),
2015-03-09 18:38:50 +00:00
},
Host: addr.Host,
ClientToken: token,
2015-08-22 00:36:19 +00:00
Params: make(map[string][]string),
2015-03-09 18:38:50 +00:00
}
2015-08-22 00:36:19 +00:00
2016-09-29 04:01:28 +00:00
var lookupPath string
switch {
case strings.HasPrefix(requestPath, "/v1/"):
lookupPath = strings.TrimPrefix(requestPath, "/v1/")
case strings.HasPrefix(requestPath, "v1/"):
lookupPath = strings.TrimPrefix(requestPath, "v1/")
2016-09-29 04:01:28 +00:00
default:
lookupPath = requestPath
2016-09-29 04:01:28 +00:00
}
2017-10-23 20:52:56 +00:00
req.MFAHeaderVals = mfaCreds
2017-10-23 20:52:56 +00:00
if wrappingLookupFunc != nil {
req.WrapTTL = wrappingLookupFunc(method, lookupPath)
2016-09-29 04:01:28 +00:00
} else {
req.WrapTTL = DefaultWrappingLookupFunc(method, lookupPath)
}
req.Headers = c.Headers()
req.PolicyOverride = policyOverride
2017-10-23 20:52:56 +00:00
2015-08-22 00:36:19 +00:00
return req
2015-03-09 18:38:50 +00:00
}
// RawRequest performs the raw request given. This request may be against
// a Vault server not configured with this client. This is an advanced operation
// that generally won't need to be called externally.
2015-03-11 18:33:20 +00:00
func (c *Client) RawRequest(r *Request) (*Response, error) {
return c.RawRequestWithContext(context.Background(), r)
}
// RawRequestWithContext performs the raw request given. This request may be against
// a Vault server not configured with this client. This is an advanced operation
// that generally won't need to be called externally.
func (c *Client) RawRequestWithContext(ctx context.Context, r *Request) (*Response, error) {
c.modifyLock.RLock()
token := c.token
2018-05-11 14:42:06 +00:00
c.config.modifyLock.RLock()
limiter := c.config.Limiter
minRetryWait := c.config.MinRetryWait
maxRetryWait := c.config.MaxRetryWait
maxRetries := c.config.MaxRetries
checkRetry := c.config.CheckRetry
backoff := c.config.Backoff
httpClient := c.config.HttpClient
timeout := c.config.Timeout
outputCurlString := c.config.OutputCurlString
logger := c.config.Logger
c.config.modifyLock.RUnlock()
2018-05-11 14:42:06 +00:00
c.modifyLock.RUnlock()
for _, cb := range c.requestCallbacks {
cb(r)
}
if c.config.ReadYourWrites {
c.replicationStateStore.requireState(r)
}
if limiter != nil {
limiter.Wait(ctx)
}
// Sanity check the token before potentially erroring from the API
idx := strings.IndexFunc(token, func(c rune) bool {
return !unicode.IsPrint(c)
})
if idx != -1 {
return nil, fmt.Errorf("configured Vault token contains non-printable characters and cannot be used")
}
2015-04-20 18:30:35 +00:00
redirectCount := 0
START:
2019-02-15 18:40:03 +00:00
req, err := r.toRetryableHTTP()
2015-03-09 18:38:50 +00:00
if err != nil {
return nil, err
}
2018-05-09 21:44:53 +00:00
if req == nil {
return nil, fmt.Errorf("nil request created")
}
if outputCurlString {
LastOutputStringError = &OutputStringError{
Request: req,
TLSSkipVerify: c.config.HttpClient.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify,
}
return nil, LastOutputStringError
}
if timeout != 0 {
// Note: we purposefully do not call cancel manually. The reason is
// when canceled, the request.Body will EOF when reading due to the way
// it streams data in. Cancel will still be run when the timeout is
// hit, so this doesn't really harm anything.
ctx, _ = context.WithTimeout(ctx, timeout)
}
req.Request = req.Request.WithContext(ctx)
2018-05-09 21:44:53 +00:00
if backoff == nil {
backoff = retryablehttp.LinearJitterBackoff
}
2015-03-09 18:38:50 +00:00
if checkRetry == nil {
checkRetry = DefaultRetryPolicy
}
2018-05-09 21:44:53 +00:00
client := &retryablehttp.Client{
HTTPClient: httpClient,
RetryWaitMin: minRetryWait,
RetryWaitMax: maxRetryWait,
RetryMax: maxRetries,
2018-05-09 21:44:53 +00:00
Backoff: backoff,
CheckRetry: checkRetry,
Logger: logger,
ErrorHandler: retryablehttp.PassthroughErrorHandler,
2018-05-09 21:44:53 +00:00
}
2015-04-07 18:15:20 +00:00
var result *Response
resp, err := client.Do(req)
2015-04-07 18:15:20 +00:00
if resp != nil {
result = &Response{Response: resp}
}
2015-04-20 18:30:35 +00:00
if err != nil {
2016-08-12 19:13:42 +00:00
if strings.Contains(err.Error(), "tls: oversized") {
err = errwrap.Wrapf(
"{{err}}\n\n"+
"This error usually means that the server is running with TLS disabled\n"+
"but the client is configured to use TLS. Please either enable TLS\n"+
"on the server or run the client with -address set to an address\n"+
"that uses the http protocol:\n\n"+
" vault <command> -address http://<address>\n\n"+
"You can also set the VAULT_ADDR environment variable:\n\n\n"+
" VAULT_ADDR=http://<address> vault <command>\n\n"+
"where <address> is replaced by the actual address to the server.",
err)
2015-04-20 18:30:35 +00:00
}
2015-04-07 18:15:20 +00:00
return result, err
2015-03-09 18:38:50 +00:00
}
2015-04-20 18:30:35 +00:00
// Check for a redirect, only allowing for a single redirect
if (resp.StatusCode == 301 || resp.StatusCode == 302 || resp.StatusCode == 307) && redirectCount == 0 {
2015-04-20 18:30:35 +00:00
// Parse the updated location
respLoc, err := resp.Location()
if err != nil {
return result, err
}
// Ensure a protocol downgrade doesn't happen
if req.URL.Scheme == "https" && respLoc.Scheme != "https" {
return result, fmt.Errorf("redirect would cause protocol downgrade")
}
// Update the request
r.URL = respLoc
// Reset the request body if any
if err := r.ResetJSONBody(); err != nil {
return result, err
}
// Retry the request
redirectCount++
goto START
}
if result != nil {
for _, cb := range c.responseCallbacks {
cb(result)
}
if c.config.ReadYourWrites {
c.replicationStateStore.recordState(result)
}
}
if err := result.Error(); err != nil {
2015-04-07 18:15:20 +00:00
return result, err
}
return result, nil
2015-03-09 18:38:50 +00:00
}
type (
RequestCallback func(*Request)
ResponseCallback func(*Response)
)
// WithRequestCallbacks makes a shallow clone of Client, modifies it to use
// the given callbacks, and returns it. Each of the callbacks will be invoked
// on every outgoing request. A client may be used to issue requests
// concurrently; any locking needed by callbacks invoked concurrently is the
// callback's responsibility.
func (c *Client) WithRequestCallbacks(callbacks ...RequestCallback) *Client {
c2 := *c
c2.modifyLock = sync.RWMutex{}
c2.requestCallbacks = callbacks
return &c2
}
// WithResponseCallbacks makes a shallow clone of Client, modifies it to use
// the given callbacks, and returns it. Each of the callbacks will be invoked
// on every received response. A client may be used to issue requests
// concurrently; any locking needed by callbacks invoked concurrently is the
// callback's responsibility.
func (c *Client) WithResponseCallbacks(callbacks ...ResponseCallback) *Client {
c2 := *c
c2.modifyLock = sync.RWMutex{}
c2.responseCallbacks = callbacks
return &c2
}
// RecordState returns a response callback that will record the state returned
// by Vault in a response header.
func RecordState(state *string) ResponseCallback {
return func(resp *Response) {
*state = resp.Header.Get(HeaderIndex)
}
}
// RequireState returns a request callback that will add a request header to
// specify the state we require of Vault. This state was obtained from a
// response header seen previous, probably captured with RecordState.
func RequireState(states ...string) RequestCallback {
return func(req *Request) {
for _, s := range states {
req.Headers.Add(HeaderIndex, s)
}
}
}
// compareReplicationStates returns 1 if s1 is newer or identical, -1 if s1 is older, and 0
// if neither s1 or s2 is strictly greater. An error is returned if s1 or s2
// are invalid or from different clusters.
func compareReplicationStates(s1, s2 string) (int, error) {
w1, err := ParseReplicationState(s1, nil)
if err != nil {
return 0, err
}
w2, err := ParseReplicationState(s2, nil)
if err != nil {
return 0, err
}
if w1.ClusterID != w2.ClusterID {
return 0, fmt.Errorf("can't compare replication states with different ClusterIDs")
}
switch {
case w1.LocalIndex >= w2.LocalIndex && w1.ReplicatedIndex >= w2.ReplicatedIndex:
return 1, nil
// We've already handled the case where both are equal above, so really we're
// asking here if one or both are lesser.
case w1.LocalIndex <= w2.LocalIndex && w1.ReplicatedIndex <= w2.ReplicatedIndex:
return -1, nil
}
return 0, nil
}
// MergeReplicationStates returns a merged array of replication states by iterating
// through all states in `old`. An iterated state is merged to the result before `new`
// based on the result of compareReplicationStates
func MergeReplicationStates(old []string, new string) []string {
if len(old) == 0 || len(old) > 2 {
return []string{new}
}
var ret []string
for _, o := range old {
c, err := compareReplicationStates(o, new)
if err != nil {
return []string{new}
}
switch c {
case 1:
ret = append(ret, o)
case -1:
ret = append(ret, new)
case 0:
ret = append(ret, o, new)
}
}
return strutil.RemoveDuplicates(ret, false)
}
func ParseReplicationState(raw string, hmacKey []byte) (*logical.WALState, error) {
cooked, err := base64.StdEncoding.DecodeString(raw)
if err != nil {
return nil, err
}
s := string(cooked)
lastIndex := strings.LastIndexByte(s, ':')
if lastIndex == -1 {
return nil, fmt.Errorf("invalid full state header format")
}
state, stateHMACRaw := s[:lastIndex], s[lastIndex+1:]
stateHMAC, err := hex.DecodeString(stateHMACRaw)
if err != nil {
return nil, fmt.Errorf("invalid state header HMAC: %v, %w", stateHMACRaw, err)
}
if len(hmacKey) != 0 {
hm := hmac.New(sha256.New, hmacKey)
hm.Write([]byte(state))
if !hmac.Equal(hm.Sum(nil), stateHMAC) {
return nil, fmt.Errorf("invalid state header HMAC (mismatch)")
}
}
pieces := strings.Split(state, ":")
if len(pieces) != 4 || pieces[0] != "v1" || pieces[1] == "" {
return nil, fmt.Errorf("invalid state header format")
}
localIndex, err := strconv.ParseUint(pieces[2], 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid local index in state header: %w", err)
}
replicatedIndex, err := strconv.ParseUint(pieces[3], 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid replicated index in state header: %w", err)
}
return &logical.WALState{
ClusterID: pieces[1],
LocalIndex: localIndex,
ReplicatedIndex: replicatedIndex,
}, nil
}
// ForwardInconsistent returns a request callback that will add a request
// header which says: if the state required isn't present on the node receiving
// this request, forward it to the active node. This should be used in
// conjunction with RequireState.
func ForwardInconsistent() RequestCallback {
return func(req *Request) {
req.Headers.Set("X-Vault-Inconsistent", "forward-active-node")
}
}
// ForwardAlways returns a request callback which adds a header telling any
// performance standbys handling the request to forward it to the active node.
// This feature must be enabled in Vault's configuration.
func ForwardAlways() RequestCallback {
return func(req *Request) {
req.Headers.Set("X-Vault-Forward", "active-node")
}
}
// DefaultRetryPolicy is the default retry policy used by new Client objects.
// It is the same as retryablehttp.DefaultRetryPolicy except that it also retries
// 412 requests, which are returned by Vault when a X-Vault-Index header isn't
// satisfied.
func DefaultRetryPolicy(ctx context.Context, resp *http.Response, err error) (bool, error) {
retry, err := retryablehttp.DefaultRetryPolicy(ctx, resp, err)
if err != nil || retry {
return retry, err
}
if resp != nil && resp.StatusCode == 412 {
return true, nil
}
return false, nil
}
// replicationStateStore is used to track cluster replication states
// in order to ensure proper read-after-write semantics for a Client.
type replicationStateStore struct {
m sync.RWMutex
store []string
}
// recordState updates the store's replication states with the merger of all
// states.
func (w *replicationStateStore) recordState(resp *Response) {
w.m.Lock()
defer w.m.Unlock()
newState := resp.Header.Get(HeaderIndex)
if newState != "" {
w.store = MergeReplicationStates(w.store, newState)
}
}
// requireState updates the Request with the store's current replication states.
func (w *replicationStateStore) requireState(req *Request) {
w.m.RLock()
defer w.m.RUnlock()
for _, s := range w.store {
req.Headers.Add(HeaderIndex, s)
}
}
// states currently stored.
func (w *replicationStateStore) states() []string {
w.m.RLock()
defer w.m.RUnlock()
c := make([]string, len(w.store))
copy(c, w.store)
return c
}