Merge pull request #2704 from hashicorp/f-relay-query-responses
Add relay-factor arg to keyring operations
This commit is contained in:
commit
fbcd43e95c
|
@ -121,31 +121,44 @@ func (a *Agent) keyringProcess(args *structs.KeyringRequest) (*structs.KeyringRe
|
|||
return &reply, nil
|
||||
}
|
||||
|
||||
// ParseRelayFactor validates and converts the given relay factor to uint8
|
||||
func ParseRelayFactor(n int) (uint8, error) {
|
||||
if n < 0 || n > 5 {
|
||||
return 0, fmt.Errorf("Relay factor must be in range: [0, 5]")
|
||||
}
|
||||
return uint8(n), nil
|
||||
}
|
||||
|
||||
// ListKeys lists out all keys installed on the collective Consul cluster. This
|
||||
// includes both servers and clients in all DC's.
|
||||
func (a *Agent) ListKeys(token string) (*structs.KeyringResponses, error) {
|
||||
func (a *Agent) ListKeys(token string, relayFactor uint8) (*structs.KeyringResponses, error) {
|
||||
args := structs.KeyringRequest{Operation: structs.KeyringList}
|
||||
args.Token = token
|
||||
parseKeyringRequest(&args, token, relayFactor)
|
||||
return a.keyringProcess(&args)
|
||||
}
|
||||
|
||||
// InstallKey installs a new gossip encryption key
|
||||
func (a *Agent) InstallKey(key, token string) (*structs.KeyringResponses, error) {
|
||||
func (a *Agent) InstallKey(key, token string, relayFactor uint8) (*structs.KeyringResponses, error) {
|
||||
args := structs.KeyringRequest{Key: key, Operation: structs.KeyringInstall}
|
||||
args.Token = token
|
||||
parseKeyringRequest(&args, token, relayFactor)
|
||||
return a.keyringProcess(&args)
|
||||
}
|
||||
|
||||
// UseKey changes the primary encryption key used to encrypt messages
|
||||
func (a *Agent) UseKey(key, token string) (*structs.KeyringResponses, error) {
|
||||
func (a *Agent) UseKey(key, token string, relayFactor uint8) (*structs.KeyringResponses, error) {
|
||||
args := structs.KeyringRequest{Key: key, Operation: structs.KeyringUse}
|
||||
args.Token = token
|
||||
parseKeyringRequest(&args, token, relayFactor)
|
||||
return a.keyringProcess(&args)
|
||||
}
|
||||
|
||||
// RemoveKey will remove a gossip encryption key from the keyring
|
||||
func (a *Agent) RemoveKey(key, token string) (*structs.KeyringResponses, error) {
|
||||
func (a *Agent) RemoveKey(key, token string, relayFactor uint8) (*structs.KeyringResponses, error) {
|
||||
args := structs.KeyringRequest{Key: key, Operation: structs.KeyringRemove}
|
||||
args.Token = token
|
||||
parseKeyringRequest(&args, token, relayFactor)
|
||||
return a.keyringProcess(&args)
|
||||
}
|
||||
|
||||
func parseKeyringRequest(req *structs.KeyringRequest, token string, relayFactor uint8) {
|
||||
req.Token = token
|
||||
req.RelayFactor = relayFactor
|
||||
}
|
||||
|
|
|
@ -132,49 +132,49 @@ func TestAgentKeyring_ACL(t *testing.T) {
|
|||
testutil.WaitForLeader(t, agent.RPC, "dc1")
|
||||
|
||||
// List keys without access fails
|
||||
_, err := agent.ListKeys("")
|
||||
_, err := agent.ListKeys("", 0)
|
||||
if err == nil || !strings.Contains(err.Error(), "denied") {
|
||||
t.Fatalf("expected denied error, got: %#v", err)
|
||||
}
|
||||
|
||||
// List keys with access works
|
||||
_, err = agent.ListKeys("root")
|
||||
_, err = agent.ListKeys("root", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Install without access fails
|
||||
_, err = agent.InstallKey(key2, "")
|
||||
_, err = agent.InstallKey(key2, "", 0)
|
||||
if err == nil || !strings.Contains(err.Error(), "denied") {
|
||||
t.Fatalf("expected denied error, got: %#v", err)
|
||||
}
|
||||
|
||||
// Install with access works
|
||||
_, err = agent.InstallKey(key2, "root")
|
||||
_, err = agent.InstallKey(key2, "root", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Use without access fails
|
||||
_, err = agent.UseKey(key2, "")
|
||||
_, err = agent.UseKey(key2, "", 0)
|
||||
if err == nil || !strings.Contains(err.Error(), "denied") {
|
||||
t.Fatalf("expected denied error, got: %#v", err)
|
||||
}
|
||||
|
||||
// Use with access works
|
||||
_, err = agent.UseKey(key2, "root")
|
||||
_, err = agent.UseKey(key2, "root", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Remove without access fails
|
||||
_, err = agent.RemoveKey(key1, "")
|
||||
_, err = agent.RemoveKey(key1, "", 0)
|
||||
if err == nil || !strings.Contains(err.Error(), "denied") {
|
||||
t.Fatalf("expected denied error, got: %#v", err)
|
||||
}
|
||||
|
||||
// Remove with access works
|
||||
_, err = agent.RemoveKey(key1, "root")
|
||||
_, err = agent.RemoveKey(key1, "root", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"github.com/hashicorp/consul/consul/structs"
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/raft"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// OperatorRaftConfiguration is used to inspect the current Raft configuration.
|
||||
|
@ -59,8 +60,9 @@ func (s *HTTPServer) OperatorRaftPeer(resp http.ResponseWriter, req *http.Reques
|
|||
}
|
||||
|
||||
type keyringArgs struct {
|
||||
Key string
|
||||
Token string
|
||||
Key string
|
||||
Token string
|
||||
RelayFactor uint8
|
||||
}
|
||||
|
||||
// OperatorKeyringEndpoint handles keyring operations (install, list, use, remove)
|
||||
|
@ -75,6 +77,23 @@ func (s *HTTPServer) OperatorKeyringEndpoint(resp http.ResponseWriter, req *http
|
|||
}
|
||||
s.parseToken(req, &args.Token)
|
||||
|
||||
// Parse relay factor
|
||||
if relayFactor := req.URL.Query().Get("relay-factor"); relayFactor != "" {
|
||||
n, err := strconv.Atoi(relayFactor)
|
||||
if err != nil {
|
||||
resp.WriteHeader(400)
|
||||
resp.Write([]byte(fmt.Sprintf("Error parsing relay factor: %v", err)))
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
args.RelayFactor, err = ParseRelayFactor(n)
|
||||
if err != nil {
|
||||
resp.WriteHeader(400)
|
||||
resp.Write([]byte(fmt.Sprintf("Invalid relay factor: %v", err)))
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Switch on the method
|
||||
switch req.Method {
|
||||
case "GET":
|
||||
|
@ -93,7 +112,7 @@ func (s *HTTPServer) OperatorKeyringEndpoint(resp http.ResponseWriter, req *http
|
|||
|
||||
// KeyringInstall is used to install a new gossip encryption key into the cluster
|
||||
func (s *HTTPServer) KeyringInstall(resp http.ResponseWriter, req *http.Request, args *keyringArgs) (interface{}, error) {
|
||||
responses, err := s.agent.InstallKey(args.Key, args.Token)
|
||||
responses, err := s.agent.InstallKey(args.Key, args.Token, args.RelayFactor)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -103,7 +122,7 @@ func (s *HTTPServer) KeyringInstall(resp http.ResponseWriter, req *http.Request,
|
|||
|
||||
// KeyringList is used to list the keys installed in the cluster
|
||||
func (s *HTTPServer) KeyringList(resp http.ResponseWriter, req *http.Request, args *keyringArgs) (interface{}, error) {
|
||||
responses, err := s.agent.ListKeys(args.Token)
|
||||
responses, err := s.agent.ListKeys(args.Token, args.RelayFactor)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -113,7 +132,7 @@ func (s *HTTPServer) KeyringList(resp http.ResponseWriter, req *http.Request, ar
|
|||
|
||||
// KeyringRemove is used to list the keys installed in the cluster
|
||||
func (s *HTTPServer) KeyringRemove(resp http.ResponseWriter, req *http.Request, args *keyringArgs) (interface{}, error) {
|
||||
responses, err := s.agent.RemoveKey(args.Key, args.Token)
|
||||
responses, err := s.agent.RemoveKey(args.Key, args.Token, args.RelayFactor)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -123,7 +142,7 @@ func (s *HTTPServer) KeyringRemove(resp http.ResponseWriter, req *http.Request,
|
|||
|
||||
// KeyringUse is used to change the primary gossip encryption key
|
||||
func (s *HTTPServer) KeyringUse(resp http.ResponseWriter, req *http.Request, args *keyringArgs) (interface{}, error) {
|
||||
responses, err := s.agent.UseKey(args.Key, args.Token)
|
||||
responses, err := s.agent.UseKey(args.Key, args.Token, args.RelayFactor)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -77,7 +77,7 @@ func TestOperator_KeyringInstall(t *testing.T) {
|
|||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
listResponse, err := srv.agent.ListKeys("")
|
||||
listResponse, err := srv.agent.ListKeys("", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -155,13 +155,13 @@ func TestOperator_KeyringRemove(t *testing.T) {
|
|||
c.EncryptKey = key
|
||||
}
|
||||
httpTestWithConfig(t, func(srv *HTTPServer) {
|
||||
_, err := srv.agent.InstallKey(tempKey, "")
|
||||
_, err := srv.agent.InstallKey(tempKey, "", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Make sure the temp key is installed
|
||||
list, err := srv.agent.ListKeys("")
|
||||
list, err := srv.agent.ListKeys("", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -191,7 +191,7 @@ func TestOperator_KeyringRemove(t *testing.T) {
|
|||
}
|
||||
|
||||
// Make sure the temp key has been removed
|
||||
list, err = srv.agent.ListKeys("")
|
||||
list, err = srv.agent.ListKeys("", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -217,7 +217,7 @@ func TestOperator_KeyringUse(t *testing.T) {
|
|||
c.EncryptKey = oldKey
|
||||
}
|
||||
httpTestWithConfig(t, func(srv *HTTPServer) {
|
||||
if _, err := srv.agent.InstallKey(newKey, ""); err != nil {
|
||||
if _, err := srv.agent.InstallKey(newKey, "", 0); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -233,12 +233,12 @@ func TestOperator_KeyringUse(t *testing.T) {
|
|||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
if _, err := srv.agent.RemoveKey(oldKey, ""); err != nil {
|
||||
if _, err := srv.agent.RemoveKey(oldKey, "", 0); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Make sure only the new key remains
|
||||
list, err := srv.agent.ListKeys("")
|
||||
list, err := srv.agent.ListKeys("", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -256,3 +256,32 @@ func TestOperator_KeyringUse(t *testing.T) {
|
|||
}
|
||||
}, configFunc)
|
||||
}
|
||||
|
||||
func TestOperator_Keyring_InvalidRelayFactor(t *testing.T) {
|
||||
key := "H3/9gBxcKKRf45CaI2DlRg=="
|
||||
configFunc := func(c *Config) {
|
||||
c.EncryptKey = key
|
||||
}
|
||||
httpTestWithConfig(t, func(srv *HTTPServer) {
|
||||
cases := map[string]string{
|
||||
"999": "Relay factor must be in range",
|
||||
"asdf": "Error parsing relay factor",
|
||||
}
|
||||
for relayFactor, errString := range cases {
|
||||
req, err := http.NewRequest("GET", "/v1/operator/keyring?relay-factor="+relayFactor, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
_, err = srv.OperatorKeyringEndpoint(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
body := resp.Body.String()
|
||||
if !strings.Contains(body, errString) {
|
||||
t.Fatalf("bad: %v", body)
|
||||
}
|
||||
}
|
||||
}, configFunc)
|
||||
}
|
||||
|
|
|
@ -106,7 +106,8 @@ type joinResponse struct {
|
|||
}
|
||||
|
||||
type keyringRequest struct {
|
||||
Key string
|
||||
Key string
|
||||
RelayFactor uint8
|
||||
}
|
||||
|
||||
type KeyringEntry struct {
|
||||
|
@ -604,21 +605,21 @@ func (i *AgentRPC) handleKeyring(client *rpcClient, seq uint64, cmd, token strin
|
|||
var r keyringResponse
|
||||
var err error
|
||||
|
||||
if cmd != listKeysCommand {
|
||||
if err = client.dec.Decode(&req); err != nil {
|
||||
return fmt.Errorf("decode failed: %v", err)
|
||||
}
|
||||
if err = client.dec.Decode(&req); err != nil {
|
||||
return fmt.Errorf("decode failed: %v", err)
|
||||
}
|
||||
|
||||
i.agent.logger.Printf("[INFO] agent: Sending rpc command with relay factor %d", req.RelayFactor)
|
||||
|
||||
switch cmd {
|
||||
case listKeysCommand:
|
||||
queryResp, err = i.agent.ListKeys(token)
|
||||
queryResp, err = i.agent.ListKeys(token, req.RelayFactor)
|
||||
case installKeyCommand:
|
||||
queryResp, err = i.agent.InstallKey(req.Key, token)
|
||||
queryResp, err = i.agent.InstallKey(req.Key, token, req.RelayFactor)
|
||||
case useKeyCommand:
|
||||
queryResp, err = i.agent.UseKey(req.Key, token)
|
||||
queryResp, err = i.agent.UseKey(req.Key, token, req.RelayFactor)
|
||||
case removeKeyCommand:
|
||||
queryResp, err = i.agent.RemoveKey(req.Key, token)
|
||||
queryResp, err = i.agent.RemoveKey(req.Key, token, req.RelayFactor)
|
||||
default:
|
||||
respHeader := responseHeader{Seq: seq, Error: unsupportedCommand}
|
||||
client.Send(&respHeader, nil)
|
||||
|
|
|
@ -194,48 +194,49 @@ func (c *RPCClient) WANMembers() ([]Member, error) {
|
|||
return resp.Members, err
|
||||
}
|
||||
|
||||
func (c *RPCClient) ListKeys(token string) (keyringResponse, error) {
|
||||
func (c *RPCClient) ListKeys(token string, relayFactor uint8) (keyringResponse, error) {
|
||||
header := requestHeader{
|
||||
Command: listKeysCommand,
|
||||
Seq: c.getSeq(),
|
||||
Token: token,
|
||||
}
|
||||
req := keyringRequest{RelayFactor: relayFactor}
|
||||
var resp keyringResponse
|
||||
err := c.genericRPC(&header, nil, &resp)
|
||||
err := c.genericRPC(&header, req, &resp)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (c *RPCClient) InstallKey(key, token string) (keyringResponse, error) {
|
||||
func (c *RPCClient) InstallKey(key, token string, relayFactor uint8) (keyringResponse, error) {
|
||||
header := requestHeader{
|
||||
Command: installKeyCommand,
|
||||
Seq: c.getSeq(),
|
||||
Token: token,
|
||||
}
|
||||
req := keyringRequest{key}
|
||||
req := keyringRequest{Key: key, RelayFactor: relayFactor}
|
||||
var resp keyringResponse
|
||||
err := c.genericRPC(&header, &req, &resp)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (c *RPCClient) UseKey(key, token string) (keyringResponse, error) {
|
||||
func (c *RPCClient) UseKey(key, token string, relayFactor uint8) (keyringResponse, error) {
|
||||
header := requestHeader{
|
||||
Command: useKeyCommand,
|
||||
Seq: c.getSeq(),
|
||||
Token: token,
|
||||
}
|
||||
req := keyringRequest{key}
|
||||
req := keyringRequest{Key: key, RelayFactor: relayFactor}
|
||||
var resp keyringResponse
|
||||
err := c.genericRPC(&header, &req, &resp)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (c *RPCClient) RemoveKey(key, token string) (keyringResponse, error) {
|
||||
func (c *RPCClient) RemoveKey(key, token string, relayFactor uint8) (keyringResponse, error) {
|
||||
header := requestHeader{
|
||||
Command: removeKeyCommand,
|
||||
Seq: c.getSeq(),
|
||||
Token: token,
|
||||
}
|
||||
req := keyringRequest{key}
|
||||
req := keyringRequest{Key: key, RelayFactor: relayFactor}
|
||||
var resp keyringResponse
|
||||
err := c.genericRPC(&header, &req, &resp)
|
||||
return resp, err
|
||||
|
|
|
@ -371,7 +371,7 @@ func TestRPCClientInstallKey(t *testing.T) {
|
|||
})
|
||||
|
||||
// install key2
|
||||
r, err := p1.client.InstallKey(key2, "")
|
||||
r, err := p1.client.InstallKey(key2, "", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -402,7 +402,7 @@ func TestRPCClientUseKey(t *testing.T) {
|
|||
defer p1.Close()
|
||||
|
||||
// add a second key to the ring
|
||||
r, err := p1.client.InstallKey(key2, "")
|
||||
r, err := p1.client.InstallKey(key2, "", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -423,21 +423,21 @@ func TestRPCClientUseKey(t *testing.T) {
|
|||
})
|
||||
|
||||
// can't remove key1 yet
|
||||
r, err = p1.client.RemoveKey(key1, "")
|
||||
r, err = p1.client.RemoveKey(key1, "", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
keyringError(t, r)
|
||||
|
||||
// change primary key
|
||||
r, err = p1.client.UseKey(key2, "")
|
||||
r, err = p1.client.UseKey(key2, "", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
keyringSuccess(t, r)
|
||||
|
||||
// can remove key1 now
|
||||
r, err = p1.client.RemoveKey(key1, "")
|
||||
r, err = p1.client.RemoveKey(key1, "", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -450,7 +450,7 @@ func TestRPCClientKeyOperation_encryptionDisabled(t *testing.T) {
|
|||
})
|
||||
defer p1.Close()
|
||||
|
||||
r, err := p1.client.ListKeys("")
|
||||
r, err := p1.client.ListKeys("", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -458,7 +458,7 @@ func TestRPCClientKeyOperation_encryptionDisabled(t *testing.T) {
|
|||
}
|
||||
|
||||
func listKeys(t *testing.T, c *RPCClient) map[string]map[string]int {
|
||||
resp, err := c.ListKeys("")
|
||||
resp, err := c.ListKeys("", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ type KeyringCommand struct {
|
|||
func (c *KeyringCommand) Run(args []string) int {
|
||||
var installKey, useKey, removeKey, token string
|
||||
var listKeys bool
|
||||
var relay int
|
||||
|
||||
cmdFlags := flag.NewFlagSet("keys", flag.ContinueOnError)
|
||||
cmdFlags.Usage = func() { c.Ui.Output(c.Help()) }
|
||||
|
@ -27,6 +28,7 @@ func (c *KeyringCommand) Run(args []string) int {
|
|||
cmdFlags.StringVar(&removeKey, "remove", "", "remove key")
|
||||
cmdFlags.BoolVar(&listKeys, "list", false, "list keys")
|
||||
cmdFlags.StringVar(&token, "token", "", "acl token")
|
||||
cmdFlags.IntVar(&relay, "relay-factor", 0, "relay factor")
|
||||
|
||||
rpcAddr := RPCAddrFlag(cmdFlags)
|
||||
if err := cmdFlags.Parse(args); err != nil {
|
||||
|
@ -56,6 +58,13 @@ func (c *KeyringCommand) Run(args []string) int {
|
|||
return 1
|
||||
}
|
||||
|
||||
// Validate the relay factor
|
||||
relayFactor, err := agent.ParseRelayFactor(relay)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Error parsing relay factor: %s", err))
|
||||
return 1
|
||||
}
|
||||
|
||||
// All other operations will require a client connection
|
||||
client, err := RPCClient(*rpcAddr)
|
||||
if err != nil {
|
||||
|
@ -66,7 +75,7 @@ func (c *KeyringCommand) Run(args []string) int {
|
|||
|
||||
if listKeys {
|
||||
c.Ui.Info("Gathering installed encryption keys...")
|
||||
r, err := client.ListKeys(token)
|
||||
r, err := client.ListKeys(token, relayFactor)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("error: %s", err))
|
||||
return 1
|
||||
|
@ -80,7 +89,7 @@ func (c *KeyringCommand) Run(args []string) int {
|
|||
|
||||
if installKey != "" {
|
||||
c.Ui.Info("Installing new gossip encryption key...")
|
||||
r, err := client.InstallKey(installKey, token)
|
||||
r, err := client.InstallKey(installKey, token, relayFactor)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("error: %s", err))
|
||||
return 1
|
||||
|
@ -90,7 +99,7 @@ func (c *KeyringCommand) Run(args []string) int {
|
|||
|
||||
if useKey != "" {
|
||||
c.Ui.Info("Changing primary gossip encryption key...")
|
||||
r, err := client.UseKey(useKey, token)
|
||||
r, err := client.UseKey(useKey, token, relayFactor)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("error: %s", err))
|
||||
return 1
|
||||
|
@ -100,7 +109,7 @@ func (c *KeyringCommand) Run(args []string) int {
|
|||
|
||||
if removeKey != "" {
|
||||
c.Ui.Info("Removing gossip encryption key...")
|
||||
r, err := client.RemoveKey(removeKey, token)
|
||||
r, err := client.RemoveKey(removeKey, token, relayFactor)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("error: %s", err))
|
||||
return 1
|
||||
|
@ -206,6 +215,11 @@ Options:
|
|||
not currently the primary key.
|
||||
-token="" ACL token to use during requests. Defaults to that
|
||||
of the agent.
|
||||
-relay-factor Added in Consul 0.7.4, setting this to a non-zero
|
||||
value will cause nodes to relay their response to
|
||||
the operation through this many randomly-chosen
|
||||
other nodes in the cluster. The maximum allowed
|
||||
value is 5.
|
||||
-use=<key> Change the primary encryption key, which is used to
|
||||
encrypt messages. The key must already be installed
|
||||
before this operation can succeed.
|
||||
|
|
|
@ -89,6 +89,17 @@ func TestKeyringCommandRun_failedConnection(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestKeyringCommandRun_invalidRelayFactor(t *testing.T) {
|
||||
ui := new(cli.MockUi)
|
||||
c := &KeyringCommand{Ui: ui}
|
||||
|
||||
args := []string{"-list", "-relay-factor=6"}
|
||||
code := c.Run(args)
|
||||
if code != 1 {
|
||||
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
|
||||
}
|
||||
}
|
||||
|
||||
func listKeys(t *testing.T, addr string) string {
|
||||
ui := new(cli.MockUi)
|
||||
c := &KeyringCommand{Ui: ui}
|
||||
|
|
|
@ -147,15 +147,16 @@ func (m *Internal) executeKeyringOp(
|
|||
mgr = m.srv.KeyManagerLAN()
|
||||
}
|
||||
|
||||
opts := &serf.KeyRequestOptions{RelayFactor: args.RelayFactor}
|
||||
switch args.Operation {
|
||||
case structs.KeyringList:
|
||||
serfResp, err = mgr.ListKeys()
|
||||
serfResp, err = mgr.ListKeysWithOptions(opts)
|
||||
case structs.KeyringInstall:
|
||||
serfResp, err = mgr.InstallKey(args.Key)
|
||||
serfResp, err = mgr.InstallKeyWithOptions(args.Key, opts)
|
||||
case structs.KeyringUse:
|
||||
serfResp, err = mgr.UseKey(args.Key)
|
||||
serfResp, err = mgr.UseKeyWithOptions(args.Key, opts)
|
||||
case structs.KeyringRemove:
|
||||
serfResp, err = mgr.RemoveKey(args.Key)
|
||||
serfResp, err = mgr.RemoveKeyWithOptions(args.Key, opts)
|
||||
}
|
||||
|
||||
errStr := ""
|
||||
|
|
|
@ -1000,10 +1000,11 @@ const (
|
|||
// KeyringRequest encapsulates a request to modify an encryption keyring.
|
||||
// It can be used for install, remove, or use key type operations.
|
||||
type KeyringRequest struct {
|
||||
Operation KeyringOp
|
||||
Key string
|
||||
Datacenter string
|
||||
Forwarded bool
|
||||
Operation KeyringOp
|
||||
Key string
|
||||
Datacenter string
|
||||
Forwarded bool
|
||||
RelayFactor uint8
|
||||
QueryOptions
|
||||
}
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@ var ProtocolVersionMap map[uint8]uint8
|
|||
|
||||
func init() {
|
||||
ProtocolVersionMap = map[uint8]uint8{
|
||||
5: 2,
|
||||
4: 2,
|
||||
3: 2,
|
||||
2: 2,
|
||||
|
@ -240,7 +241,7 @@ func DefaultConfig() *Config {
|
|||
EventBuffer: 512,
|
||||
QueryBuffer: 512,
|
||||
LogOutput: os.Stderr,
|
||||
ProtocolVersion: ProtocolVersionMax,
|
||||
ProtocolVersion: 4,
|
||||
ReapInterval: 15 * time.Second,
|
||||
RecentIntentTimeout: 5 * time.Minute,
|
||||
ReconnectInterval: 30 * time.Second,
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
package serf
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/go-msgpack/codec"
|
||||
)
|
||||
|
||||
// delegate is the memberlist.Delegate implementation that Serf uses.
|
||||
|
@ -83,6 +85,25 @@ func (d *delegate) NotifyMsg(buf []byte) {
|
|||
d.serf.logger.Printf("[DEBUG] serf: messageQueryResponseType: %v", resp.From)
|
||||
d.serf.handleQueryResponse(&resp)
|
||||
|
||||
case messageRelayType:
|
||||
var header relayHeader
|
||||
var handle codec.MsgpackHandle
|
||||
reader := bytes.NewReader(buf[1:])
|
||||
decoder := codec.NewDecoder(reader, &handle)
|
||||
if err := decoder.Decode(&header); err != nil {
|
||||
d.serf.logger.Printf("[ERR] serf: Error decoding relay header: %s", err)
|
||||
break
|
||||
}
|
||||
|
||||
// The remaining contents are the message itself, so forward that
|
||||
raw := make([]byte, reader.Len())
|
||||
reader.Read(raw)
|
||||
d.serf.logger.Printf("[DEBUG] serf: Relaying response to addr: %s", header.DestAddr.String())
|
||||
if err := d.serf.memberlist.SendTo(&header.DestAddr, raw); err != nil {
|
||||
d.serf.logger.Printf("[ERR] serf: Error forwarding message to %s: %s", header.DestAddr.String(), err)
|
||||
break
|
||||
}
|
||||
|
||||
default:
|
||||
d.serf.logger.Printf("[WARN] serf: Received message of unknown type: %d", t)
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package serf
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -95,18 +96,19 @@ func (u UserEvent) String() string {
|
|||
return fmt.Sprintf("user-event: %s", u.Name)
|
||||
}
|
||||
|
||||
// Query is the struct used EventQuery type events
|
||||
// Query is the struct used by EventQuery type events
|
||||
type Query struct {
|
||||
LTime LamportTime
|
||||
Name string
|
||||
Payload []byte
|
||||
|
||||
serf *Serf
|
||||
id uint32 // ID is not exported, since it may change
|
||||
addr []byte // Address to respond to
|
||||
port uint16 // Port to respond to
|
||||
deadline time.Time // Must respond by this deadline
|
||||
respLock sync.Mutex
|
||||
serf *Serf
|
||||
id uint32 // ID is not exported, since it may change
|
||||
addr []byte // Address to respond to
|
||||
port uint16 // Port to respond to
|
||||
deadline time.Time // Must respond by this deadline
|
||||
relayFactor uint8 // Number of duplicate responses to relay back to sender
|
||||
respLock sync.Mutex
|
||||
}
|
||||
|
||||
func (q *Query) EventType() EventType {
|
||||
|
@ -145,24 +147,84 @@ func (q *Query) Respond(buf []byte) error {
|
|||
Payload: buf,
|
||||
}
|
||||
|
||||
// Format the response
|
||||
raw, err := encodeMessage(messageQueryResponseType, &resp)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to format response: %v", err)
|
||||
// Send a direct response
|
||||
{
|
||||
raw, err := encodeMessage(messageQueryResponseType, &resp)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to format response: %v", err)
|
||||
}
|
||||
|
||||
// Check the size limit
|
||||
if len(raw) > q.serf.config.QueryResponseSizeLimit {
|
||||
return fmt.Errorf("response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit)
|
||||
}
|
||||
|
||||
addr := net.UDPAddr{IP: q.addr, Port: int(q.port)}
|
||||
if err := q.serf.memberlist.SendTo(&addr, raw); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Check the size limit
|
||||
if len(raw) > q.serf.config.QueryResponseSizeLimit {
|
||||
return fmt.Errorf("response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit)
|
||||
// Relay the response through up to relayFactor other nodes
|
||||
members := q.serf.Members()
|
||||
if len(members) > 2 {
|
||||
addr := net.UDPAddr{IP: q.addr, Port: int(q.port)}
|
||||
raw, err := encodeRelayMessage(messageQueryResponseType, addr, &resp)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to format relayed response: %v", err)
|
||||
}
|
||||
|
||||
// Check the size limit
|
||||
if len(raw) > q.serf.config.QueryResponseSizeLimit {
|
||||
return fmt.Errorf("relayed response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit)
|
||||
}
|
||||
|
||||
relayMembers := kRandomMembers(int(q.relayFactor), members, func(m Member) bool {
|
||||
return m.Status != StatusAlive || m.ProtocolMax < 5 || m.Name == q.serf.LocalMember().Name
|
||||
})
|
||||
|
||||
for _, m := range relayMembers {
|
||||
relayAddr := net.UDPAddr{IP: m.Addr, Port: int(m.Port)}
|
||||
if err := q.serf.memberlist.SendTo(&relayAddr, raw); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send the response
|
||||
addr := net.UDPAddr{IP: q.addr, Port: int(q.port)}
|
||||
if err := q.serf.memberlist.SendTo(&addr, raw); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Clera the deadline, response sent
|
||||
// Clear the deadline, responses sent
|
||||
q.deadline = time.Time{}
|
||||
return nil
|
||||
}
|
||||
|
||||
// kRandomMembers selects up to k members from a given list, optionally
|
||||
// filtering by the given filterFunc
|
||||
func kRandomMembers(k int, members []Member, filterFunc func(Member) bool) []Member {
|
||||
n := len(members)
|
||||
kMembers := make([]Member, 0, k)
|
||||
OUTER:
|
||||
// Probe up to 3*n times, with large n this is not necessary
|
||||
// since k << n, but with small n we want search to be
|
||||
// exhaustive
|
||||
for i := 0; i < 3*n && len(kMembers) < k; i++ {
|
||||
// Get random member
|
||||
idx := rand.Intn(n)
|
||||
member := members[idx]
|
||||
|
||||
// Give the filter a shot at it.
|
||||
if filterFunc != nil && filterFunc(member) {
|
||||
continue OUTER
|
||||
}
|
||||
|
||||
// Check if we have this member already
|
||||
for j := 0; j < len(kMembers); j++ {
|
||||
if member.Name == kMembers[j].Name {
|
||||
continue OUTER
|
||||
}
|
||||
}
|
||||
|
||||
// Append the member
|
||||
kMembers = append(kMembers, member)
|
||||
}
|
||||
|
||||
return kMembers
|
||||
}
|
||||
|
|
|
@ -33,6 +33,13 @@ type KeyResponse struct {
|
|||
Keys map[string]int
|
||||
}
|
||||
|
||||
// KeyRequestOptions is used to contain optional parameters for a keyring operation
|
||||
type KeyRequestOptions struct {
|
||||
// RelayFactor is the number of duplicate query responses to send by relaying through
|
||||
// other nodes, for redundancy
|
||||
RelayFactor uint8
|
||||
}
|
||||
|
||||
// streamKeyResp takes care of reading responses from a channel and composing
|
||||
// them into a KeyResponse. It will update a KeyResponse *in place* and
|
||||
// therefore has nothing to return.
|
||||
|
@ -83,7 +90,7 @@ func (k *KeyManager) streamKeyResp(resp *KeyResponse, ch <-chan NodeResponse) {
|
|||
// handleKeyRequest performs query broadcasting to all members for any type of
|
||||
// key operation and manages gathering responses and packing them up into a
|
||||
// KeyResponse for uniform response handling.
|
||||
func (k *KeyManager) handleKeyRequest(key, query string) (*KeyResponse, error) {
|
||||
func (k *KeyManager) handleKeyRequest(key, query string, opts *KeyRequestOptions) (*KeyResponse, error) {
|
||||
resp := &KeyResponse{
|
||||
Messages: make(map[string]string),
|
||||
Keys: make(map[string]int),
|
||||
|
@ -103,6 +110,9 @@ func (k *KeyManager) handleKeyRequest(key, query string) (*KeyResponse, error) {
|
|||
}
|
||||
|
||||
qParam := k.serf.DefaultQueryParams()
|
||||
if opts != nil {
|
||||
qParam.RelayFactor = opts.RelayFactor
|
||||
}
|
||||
queryResp, err := k.serf.Query(qName, req, qParam)
|
||||
if err != nil {
|
||||
return resp, err
|
||||
|
@ -127,30 +137,42 @@ func (k *KeyManager) handleKeyRequest(key, query string) (*KeyResponse, error) {
|
|||
// responses from each of them, returning a list of messages from each node
|
||||
// and any applicable error conditions.
|
||||
func (k *KeyManager) InstallKey(key string) (*KeyResponse, error) {
|
||||
return k.InstallKeyWithOptions(key, nil)
|
||||
}
|
||||
|
||||
func (k *KeyManager) InstallKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) {
|
||||
k.l.Lock()
|
||||
defer k.l.Unlock()
|
||||
|
||||
return k.handleKeyRequest(key, installKeyQuery)
|
||||
return k.handleKeyRequest(key, installKeyQuery, opts)
|
||||
}
|
||||
|
||||
// UseKey handles broadcasting a primary key change to all members in the
|
||||
// cluster, and gathering any response messages. If successful, there should
|
||||
// be an empty KeyResponse returned.
|
||||
func (k *KeyManager) UseKey(key string) (*KeyResponse, error) {
|
||||
return k.UseKeyWithOptions(key, nil)
|
||||
}
|
||||
|
||||
func (k *KeyManager) UseKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) {
|
||||
k.l.Lock()
|
||||
defer k.l.Unlock()
|
||||
|
||||
return k.handleKeyRequest(key, useKeyQuery)
|
||||
return k.handleKeyRequest(key, useKeyQuery, opts)
|
||||
}
|
||||
|
||||
// RemoveKey handles broadcasting a key to the cluster for removal. Each member
|
||||
// will receive this event, and if they have the key in their keyring, remove
|
||||
// it. If any errors are encountered, RemoveKey will collect and relay them.
|
||||
func (k *KeyManager) RemoveKey(key string) (*KeyResponse, error) {
|
||||
return k.RemoveKeyWithOptions(key, nil)
|
||||
}
|
||||
|
||||
func (k *KeyManager) RemoveKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) {
|
||||
k.l.Lock()
|
||||
defer k.l.Unlock()
|
||||
|
||||
return k.handleKeyRequest(key, removeKeyQuery)
|
||||
return k.handleKeyRequest(key, removeKeyQuery, opts)
|
||||
}
|
||||
|
||||
// ListKeys is used to collect installed keys from members in a Serf cluster
|
||||
|
@ -159,8 +181,12 @@ func (k *KeyManager) RemoveKey(key string) (*KeyResponse, error) {
|
|||
// Since having multiple keys installed can cause performance penalties in some
|
||||
// cases, it's important to verify this information and remove unneeded keys.
|
||||
func (k *KeyManager) ListKeys() (*KeyResponse, error) {
|
||||
return k.ListKeysWithOptions(nil)
|
||||
}
|
||||
|
||||
func (k *KeyManager) ListKeysWithOptions(opts *KeyRequestOptions) (*KeyResponse, error) {
|
||||
k.l.RLock()
|
||||
defer k.l.RUnlock()
|
||||
|
||||
return k.handleKeyRequest("", listKeysQuery)
|
||||
}
|
||||
return k.handleKeyRequest("", listKeysQuery, opts)
|
||||
}
|
|
@ -2,8 +2,10 @@ package serf
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"github.com/hashicorp/go-msgpack/codec"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-msgpack/codec"
|
||||
)
|
||||
|
||||
// messageType are the types of gossip messages Serf will send along
|
||||
|
@ -20,6 +22,7 @@ const (
|
|||
messageConflictResponseType
|
||||
messageKeyRequestType
|
||||
messageKeyResponseType
|
||||
messageRelayType
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -75,15 +78,16 @@ type messageUserEvent struct {
|
|||
|
||||
// messageQuery is used for query events
|
||||
type messageQuery struct {
|
||||
LTime LamportTime // Event lamport time
|
||||
ID uint32 // Query ID, randomly generated
|
||||
Addr []byte // Source address, used for a direct reply
|
||||
Port uint16 // Source port, used for a direct reply
|
||||
Filters [][]byte // Potential query filters
|
||||
Flags uint32 // Used to provide various flags
|
||||
Timeout time.Duration // Maximum time between delivery and response
|
||||
Name string // Query name
|
||||
Payload []byte // Query payload
|
||||
LTime LamportTime // Event lamport time
|
||||
ID uint32 // Query ID, randomly generated
|
||||
Addr []byte // Source address, used for a direct reply
|
||||
Port uint16 // Source port, used for a direct reply
|
||||
Filters [][]byte // Potential query filters
|
||||
Flags uint32 // Used to provide various flags
|
||||
RelayFactor uint8 // Used to set the number of duplicate relayed responses
|
||||
Timeout time.Duration // Maximum time between delivery and response
|
||||
Name string // Query name
|
||||
Payload []byte // Query payload
|
||||
}
|
||||
|
||||
// Ack checks if the ack flag is set
|
||||
|
@ -136,6 +140,28 @@ func encodeMessage(t messageType, msg interface{}) ([]byte, error) {
|
|||
return buf.Bytes(), err
|
||||
}
|
||||
|
||||
// relayHeader is used to store the end destination of a relayed message
|
||||
type relayHeader struct {
|
||||
DestAddr net.UDPAddr
|
||||
}
|
||||
|
||||
// encodeRelayMessage wraps a message in the messageRelayType, adding the length and
|
||||
// address of the end recipient to the front of the message
|
||||
func encodeRelayMessage(t messageType, addr net.UDPAddr, msg interface{}) ([]byte, error) {
|
||||
buf := bytes.NewBuffer(nil)
|
||||
handle := codec.MsgpackHandle{}
|
||||
encoder := codec.NewEncoder(buf, &handle)
|
||||
|
||||
buf.WriteByte(uint8(messageRelayType))
|
||||
if err := encoder.Encode(relayHeader{DestAddr: addr}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
buf.WriteByte(uint8(t))
|
||||
err := encoder.Encode(msg)
|
||||
return buf.Bytes(), err
|
||||
}
|
||||
|
||||
func encodeFilter(f filterType, filt interface{}) ([]byte, error) {
|
||||
buf := bytes.NewBuffer(nil)
|
||||
buf.WriteByte(uint8(f))
|
||||
|
|
|
@ -24,6 +24,10 @@ type QueryParam struct {
|
|||
// send an ack.
|
||||
RequestAck bool
|
||||
|
||||
// RelayFactor controls the number of duplicate responses to relay
|
||||
// back to the sender through other nodes for redundancy.
|
||||
RelayFactor uint8
|
||||
|
||||
// The timeout limits how long the query is left open. If not provided,
|
||||
// then a default timeout is used based on the configuration of Serf
|
||||
Timeout time.Duration
|
||||
|
@ -93,6 +97,10 @@ type QueryResponse struct {
|
|||
// respCh is used to send a response from a node
|
||||
respCh chan NodeResponse
|
||||
|
||||
// acks/responses are used to track the nodes that have sent an ack/response
|
||||
acks map[string]struct{}
|
||||
responses map[string]struct{}
|
||||
|
||||
closed bool
|
||||
closeLock sync.Mutex
|
||||
}
|
||||
|
@ -100,13 +108,15 @@ type QueryResponse struct {
|
|||
// newQueryResponse is used to construct a new query response
|
||||
func newQueryResponse(n int, q *messageQuery) *QueryResponse {
|
||||
resp := &QueryResponse{
|
||||
deadline: time.Now().Add(q.Timeout),
|
||||
id: q.ID,
|
||||
lTime: q.LTime,
|
||||
respCh: make(chan NodeResponse, n),
|
||||
deadline: time.Now().Add(q.Timeout),
|
||||
id: q.ID,
|
||||
lTime: q.LTime,
|
||||
respCh: make(chan NodeResponse, n),
|
||||
responses: make(map[string]struct{}),
|
||||
}
|
||||
if q.Ack() {
|
||||
resp.ackCh = make(chan string, n)
|
||||
resp.acks = make(map[string]struct{})
|
||||
}
|
||||
return resp
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ import (
|
|||
// version to memberlist below.
|
||||
const (
|
||||
ProtocolVersionMin uint8 = 2
|
||||
ProtocolVersionMax = 4
|
||||
ProtocolVersionMax = 5
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -499,15 +499,16 @@ func (s *Serf) Query(name string, payload []byte, params *QueryParam) (*QueryRes
|
|||
|
||||
// Create a message
|
||||
q := messageQuery{
|
||||
LTime: s.queryClock.Time(),
|
||||
ID: uint32(rand.Int31()),
|
||||
Addr: local.Addr,
|
||||
Port: local.Port,
|
||||
Filters: filters,
|
||||
Flags: flags,
|
||||
Timeout: params.Timeout,
|
||||
Name: name,
|
||||
Payload: payload,
|
||||
LTime: s.queryClock.Time(),
|
||||
ID: uint32(rand.Int31()),
|
||||
Addr: local.Addr,
|
||||
Port: local.Port,
|
||||
Filters: filters,
|
||||
Flags: flags,
|
||||
RelayFactor: params.RelayFactor,
|
||||
Timeout: params.Timeout,
|
||||
Name: name,
|
||||
Payload: payload,
|
||||
}
|
||||
|
||||
// Encode the query
|
||||
|
@ -1242,14 +1243,15 @@ func (s *Serf) handleQuery(query *messageQuery) bool {
|
|||
|
||||
if s.config.EventCh != nil {
|
||||
s.config.EventCh <- &Query{
|
||||
LTime: query.LTime,
|
||||
Name: query.Name,
|
||||
Payload: query.Payload,
|
||||
serf: s,
|
||||
id: query.ID,
|
||||
addr: query.Addr,
|
||||
port: query.Port,
|
||||
deadline: time.Now().Add(query.Timeout),
|
||||
LTime: query.LTime,
|
||||
Name: query.Name,
|
||||
Payload: query.Payload,
|
||||
serf: s,
|
||||
id: query.ID,
|
||||
addr: query.Addr,
|
||||
port: query.Port,
|
||||
deadline: time.Now().Add(query.Timeout),
|
||||
relayFactor: query.RelayFactor,
|
||||
}
|
||||
}
|
||||
return rebroadcast
|
||||
|
@ -1282,18 +1284,30 @@ func (s *Serf) handleQueryResponse(resp *messageQueryResponse) {
|
|||
|
||||
// Process each type of response
|
||||
if resp.Ack() {
|
||||
// Exit early if this is a duplicate ack
|
||||
if _, ok := query.acks[resp.From]; ok {
|
||||
return
|
||||
}
|
||||
|
||||
metrics.IncrCounter([]string{"serf", "query_acks"}, 1)
|
||||
select {
|
||||
case query.ackCh <- resp.From:
|
||||
query.acks[resp.From] = struct{}{}
|
||||
default:
|
||||
s.logger.Printf("[WARN] serf: Failed to delivery query ack, dropping")
|
||||
s.logger.Printf("[WARN] serf: Failed to deliver query ack, dropping")
|
||||
}
|
||||
} else {
|
||||
// Exit early if this is a duplicate response
|
||||
if _, ok := query.responses[resp.From]; ok {
|
||||
return
|
||||
}
|
||||
|
||||
metrics.IncrCounter([]string{"serf", "query_responses"}, 1)
|
||||
select {
|
||||
case query.respCh <- NodeResponse{From: resp.From, Payload: resp.Payload}:
|
||||
query.responses[resp.From] = struct{}{}
|
||||
default:
|
||||
s.logger.Printf("[WARN] serf: Failed to delivery query response, dropping")
|
||||
s.logger.Printf("[WARN] serf: Failed to deliver query response, dropping")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -625,11 +625,11 @@
|
|||
"revisionTime": "2016-08-09T01:42:04Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "vLyudzMEdik8IpRY1H2vRa2PeLU=",
|
||||
"checksumSHA1": "EhESUBqb9Kot4rzZu2l/oAJoYCU=",
|
||||
"comment": "v0.7.0-66-g6c4672d",
|
||||
"path": "github.com/hashicorp/serf/serf",
|
||||
"revision": "114430d8210835d66defdc31cdc176c58e060005",
|
||||
"revisionTime": "2016-08-09T01:42:04Z"
|
||||
"revision": "34e94dbd8faa991710b442c22ad6ad37c8b44c3b",
|
||||
"revisionTime": "2017-02-02T01:56:25Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "ZhK6IO2XN81Y+3RAjTcVm1Ic7oU=",
|
||||
|
|
|
@ -138,6 +138,9 @@ Available in Consul 0.7.2 and later, the keyring endpoint supports the
|
|||
This endpoint supports the use of ACL tokens using either the `X-CONSUL-TOKEN`
|
||||
header or the `?token=` query parameter.
|
||||
|
||||
Added in Consul 0.7.4, this endpoint supports the `?relay-factor=` query parameter.
|
||||
See the [Keyring Command](/docs/commands/keyring.html#_relay_factor) for more details.
|
||||
|
||||
#### GET Method
|
||||
|
||||
Using the `GET` method, this endpoint will list the gossip encryption keys
|
||||
|
|
|
@ -48,6 +48,10 @@ The list of available flags are:
|
|||
|
||||
* `-token=""` - ACL token to use during requests. Defaults to that of the agent.
|
||||
|
||||
* `-relay-factor` - Added in Consul 0.7.4, setting this to a non-zero value will
|
||||
cause nodes to relay their response to the operation through this many
|
||||
randomly-chosen other nodes in the cluster. The maximum allowed value is 5.
|
||||
|
||||
* `-rpc-addr` - Address to the RPC server of the agent you want to contact
|
||||
to send this command. If this isn't specified, the command will contact
|
||||
"127.0.0.1:8400" which is the default RPC address of a Consul agent.
|
||||
|
|
Loading…
Reference in New Issue