From 21ce56e6f367339b9caf1fd9bf59f4d4af02b10d Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 1 Feb 2017 20:02:13 -0500 Subject: [PATCH 1/2] Update serf deps --- .../github.com/hashicorp/serf/serf/config.go | 3 +- .../hashicorp/serf/serf/delegate.go | 21 ++++ .../github.com/hashicorp/serf/serf/event.go | 104 ++++++++++++++---- .../hashicorp/serf/serf/keymanager.go | 38 ++++++- .../hashicorp/serf/serf/messages.go | 46 ++++++-- .../github.com/hashicorp/serf/serf/query.go | 18 ++- vendor/github.com/hashicorp/serf/serf/serf.go | 54 +++++---- vendor/vendor.json | 6 +- 8 files changed, 225 insertions(+), 65 deletions(-) diff --git a/vendor/github.com/hashicorp/serf/serf/config.go b/vendor/github.com/hashicorp/serf/serf/config.go index dfe878bbc..2403cea58 100644 --- a/vendor/github.com/hashicorp/serf/serf/config.go +++ b/vendor/github.com/hashicorp/serf/serf/config.go @@ -15,6 +15,7 @@ var ProtocolVersionMap map[uint8]uint8 func init() { ProtocolVersionMap = map[uint8]uint8{ + 5: 2, 4: 2, 3: 2, 2: 2, @@ -240,7 +241,7 @@ func DefaultConfig() *Config { EventBuffer: 512, QueryBuffer: 512, LogOutput: os.Stderr, - ProtocolVersion: ProtocolVersionMax, + ProtocolVersion: 4, ReapInterval: 15 * time.Second, RecentIntentTimeout: 5 * time.Minute, ReconnectInterval: 30 * time.Second, diff --git a/vendor/github.com/hashicorp/serf/serf/delegate.go b/vendor/github.com/hashicorp/serf/serf/delegate.go index d19ca3090..8f51cb7d0 100644 --- a/vendor/github.com/hashicorp/serf/serf/delegate.go +++ b/vendor/github.com/hashicorp/serf/serf/delegate.go @@ -1,9 +1,11 @@ package serf import ( + "bytes" "fmt" "github.com/armon/go-metrics" + "github.com/hashicorp/go-msgpack/codec" ) // delegate is the memberlist.Delegate implementation that Serf uses. @@ -83,6 +85,25 @@ func (d *delegate) NotifyMsg(buf []byte) { d.serf.logger.Printf("[DEBUG] serf: messageQueryResponseType: %v", resp.From) d.serf.handleQueryResponse(&resp) + case messageRelayType: + var header relayHeader + var handle codec.MsgpackHandle + reader := bytes.NewReader(buf[1:]) + decoder := codec.NewDecoder(reader, &handle) + if err := decoder.Decode(&header); err != nil { + d.serf.logger.Printf("[ERR] serf: Error decoding relay header: %s", err) + break + } + + // The remaining contents are the message itself, so forward that + raw := make([]byte, reader.Len()) + reader.Read(raw) + d.serf.logger.Printf("[DEBUG] serf: Relaying response to addr: %s", header.DestAddr.String()) + if err := d.serf.memberlist.SendTo(&header.DestAddr, raw); err != nil { + d.serf.logger.Printf("[ERR] serf: Error forwarding message to %s: %s", header.DestAddr.String(), err) + break + } + default: d.serf.logger.Printf("[WARN] serf: Received message of unknown type: %d", t) } diff --git a/vendor/github.com/hashicorp/serf/serf/event.go b/vendor/github.com/hashicorp/serf/serf/event.go index 8337e95ea..c6be9242e 100644 --- a/vendor/github.com/hashicorp/serf/serf/event.go +++ b/vendor/github.com/hashicorp/serf/serf/event.go @@ -2,6 +2,7 @@ package serf import ( "fmt" + "math/rand" "net" "sync" "time" @@ -95,18 +96,19 @@ func (u UserEvent) String() string { return fmt.Sprintf("user-event: %s", u.Name) } -// Query is the struct used EventQuery type events +// Query is the struct used by EventQuery type events type Query struct { LTime LamportTime Name string Payload []byte - serf *Serf - id uint32 // ID is not exported, since it may change - addr []byte // Address to respond to - port uint16 // Port to respond to - deadline time.Time // Must respond by this deadline - respLock sync.Mutex + serf *Serf + id uint32 // ID is not exported, since it may change + addr []byte // Address to respond to + port uint16 // Port to respond to + deadline time.Time // Must respond by this deadline + relayFactor uint8 // Number of duplicate responses to relay back to sender + respLock sync.Mutex } func (q *Query) EventType() EventType { @@ -145,24 +147,84 @@ func (q *Query) Respond(buf []byte) error { Payload: buf, } - // Format the response - raw, err := encodeMessage(messageQueryResponseType, &resp) - if err != nil { - return fmt.Errorf("Failed to format response: %v", err) + // Send a direct response + { + raw, err := encodeMessage(messageQueryResponseType, &resp) + if err != nil { + return fmt.Errorf("Failed to format response: %v", err) + } + + // Check the size limit + if len(raw) > q.serf.config.QueryResponseSizeLimit { + return fmt.Errorf("response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit) + } + + addr := net.UDPAddr{IP: q.addr, Port: int(q.port)} + if err := q.serf.memberlist.SendTo(&addr, raw); err != nil { + return err + } } - // Check the size limit - if len(raw) > q.serf.config.QueryResponseSizeLimit { - return fmt.Errorf("response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit) + // Relay the response through up to relayFactor other nodes + members := q.serf.Members() + if len(members) > 2 { + addr := net.UDPAddr{IP: q.addr, Port: int(q.port)} + raw, err := encodeRelayMessage(messageQueryResponseType, addr, &resp) + if err != nil { + return fmt.Errorf("Failed to format relayed response: %v", err) + } + + // Check the size limit + if len(raw) > q.serf.config.QueryResponseSizeLimit { + return fmt.Errorf("relayed response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit) + } + + relayMembers := kRandomMembers(int(q.relayFactor), members, func(m Member) bool { + return m.Status != StatusAlive || m.ProtocolMax < 5 || m.Name == q.serf.LocalMember().Name + }) + + for _, m := range relayMembers { + relayAddr := net.UDPAddr{IP: m.Addr, Port: int(m.Port)} + if err := q.serf.memberlist.SendTo(&relayAddr, raw); err != nil { + return err + } + } } - // Send the response - addr := net.UDPAddr{IP: q.addr, Port: int(q.port)} - if err := q.serf.memberlist.SendTo(&addr, raw); err != nil { - return err - } - - // Clera the deadline, response sent + // Clear the deadline, responses sent q.deadline = time.Time{} return nil } + +// kRandomMembers selects up to k members from a given list, optionally +// filtering by the given filterFunc +func kRandomMembers(k int, members []Member, filterFunc func(Member) bool) []Member { + n := len(members) + kMembers := make([]Member, 0, k) +OUTER: + // Probe up to 3*n times, with large n this is not necessary + // since k << n, but with small n we want search to be + // exhaustive + for i := 0; i < 3*n && len(kMembers) < k; i++ { + // Get random member + idx := rand.Intn(n) + member := members[idx] + + // Give the filter a shot at it. + if filterFunc != nil && filterFunc(member) { + continue OUTER + } + + // Check if we have this member already + for j := 0; j < len(kMembers); j++ { + if member.Name == kMembers[j].Name { + continue OUTER + } + } + + // Append the member + kMembers = append(kMembers, member) + } + + return kMembers +} diff --git a/vendor/github.com/hashicorp/serf/serf/keymanager.go b/vendor/github.com/hashicorp/serf/serf/keymanager.go index 72a319449..fd53182fc 100644 --- a/vendor/github.com/hashicorp/serf/serf/keymanager.go +++ b/vendor/github.com/hashicorp/serf/serf/keymanager.go @@ -33,6 +33,13 @@ type KeyResponse struct { Keys map[string]int } +// KeyRequestOptions is used to contain optional parameters for a keyring operation +type KeyRequestOptions struct { + // RelayFactor is the number of duplicate query responses to send by relaying through + // other nodes, for redundancy + RelayFactor uint8 +} + // streamKeyResp takes care of reading responses from a channel and composing // them into a KeyResponse. It will update a KeyResponse *in place* and // therefore has nothing to return. @@ -83,7 +90,7 @@ func (k *KeyManager) streamKeyResp(resp *KeyResponse, ch <-chan NodeResponse) { // handleKeyRequest performs query broadcasting to all members for any type of // key operation and manages gathering responses and packing them up into a // KeyResponse for uniform response handling. -func (k *KeyManager) handleKeyRequest(key, query string) (*KeyResponse, error) { +func (k *KeyManager) handleKeyRequest(key, query string, opts *KeyRequestOptions) (*KeyResponse, error) { resp := &KeyResponse{ Messages: make(map[string]string), Keys: make(map[string]int), @@ -103,6 +110,9 @@ func (k *KeyManager) handleKeyRequest(key, query string) (*KeyResponse, error) { } qParam := k.serf.DefaultQueryParams() + if opts != nil { + qParam.RelayFactor = opts.RelayFactor + } queryResp, err := k.serf.Query(qName, req, qParam) if err != nil { return resp, err @@ -127,30 +137,42 @@ func (k *KeyManager) handleKeyRequest(key, query string) (*KeyResponse, error) { // responses from each of them, returning a list of messages from each node // and any applicable error conditions. func (k *KeyManager) InstallKey(key string) (*KeyResponse, error) { + return k.InstallKeyWithOptions(key, nil) +} + +func (k *KeyManager) InstallKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) { k.l.Lock() defer k.l.Unlock() - return k.handleKeyRequest(key, installKeyQuery) + return k.handleKeyRequest(key, installKeyQuery, opts) } // UseKey handles broadcasting a primary key change to all members in the // cluster, and gathering any response messages. If successful, there should // be an empty KeyResponse returned. func (k *KeyManager) UseKey(key string) (*KeyResponse, error) { + return k.UseKeyWithOptions(key, nil) +} + +func (k *KeyManager) UseKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) { k.l.Lock() defer k.l.Unlock() - return k.handleKeyRequest(key, useKeyQuery) + return k.handleKeyRequest(key, useKeyQuery, opts) } // RemoveKey handles broadcasting a key to the cluster for removal. Each member // will receive this event, and if they have the key in their keyring, remove // it. If any errors are encountered, RemoveKey will collect and relay them. func (k *KeyManager) RemoveKey(key string) (*KeyResponse, error) { + return k.RemoveKeyWithOptions(key, nil) +} + +func (k *KeyManager) RemoveKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) { k.l.Lock() defer k.l.Unlock() - return k.handleKeyRequest(key, removeKeyQuery) + return k.handleKeyRequest(key, removeKeyQuery, opts) } // ListKeys is used to collect installed keys from members in a Serf cluster @@ -159,8 +181,12 @@ func (k *KeyManager) RemoveKey(key string) (*KeyResponse, error) { // Since having multiple keys installed can cause performance penalties in some // cases, it's important to verify this information and remove unneeded keys. func (k *KeyManager) ListKeys() (*KeyResponse, error) { + return k.ListKeysWithOptions(nil) +} + +func (k *KeyManager) ListKeysWithOptions(opts *KeyRequestOptions) (*KeyResponse, error) { k.l.RLock() defer k.l.RUnlock() - return k.handleKeyRequest("", listKeysQuery) -} + return k.handleKeyRequest("", listKeysQuery, opts) +} \ No newline at end of file diff --git a/vendor/github.com/hashicorp/serf/serf/messages.go b/vendor/github.com/hashicorp/serf/serf/messages.go index c90c96450..20df5b8e8 100644 --- a/vendor/github.com/hashicorp/serf/serf/messages.go +++ b/vendor/github.com/hashicorp/serf/serf/messages.go @@ -2,8 +2,10 @@ package serf import ( "bytes" - "github.com/hashicorp/go-msgpack/codec" + "net" "time" + + "github.com/hashicorp/go-msgpack/codec" ) // messageType are the types of gossip messages Serf will send along @@ -20,6 +22,7 @@ const ( messageConflictResponseType messageKeyRequestType messageKeyResponseType + messageRelayType ) const ( @@ -75,15 +78,16 @@ type messageUserEvent struct { // messageQuery is used for query events type messageQuery struct { - LTime LamportTime // Event lamport time - ID uint32 // Query ID, randomly generated - Addr []byte // Source address, used for a direct reply - Port uint16 // Source port, used for a direct reply - Filters [][]byte // Potential query filters - Flags uint32 // Used to provide various flags - Timeout time.Duration // Maximum time between delivery and response - Name string // Query name - Payload []byte // Query payload + LTime LamportTime // Event lamport time + ID uint32 // Query ID, randomly generated + Addr []byte // Source address, used for a direct reply + Port uint16 // Source port, used for a direct reply + Filters [][]byte // Potential query filters + Flags uint32 // Used to provide various flags + RelayFactor uint8 // Used to set the number of duplicate relayed responses + Timeout time.Duration // Maximum time between delivery and response + Name string // Query name + Payload []byte // Query payload } // Ack checks if the ack flag is set @@ -136,6 +140,28 @@ func encodeMessage(t messageType, msg interface{}) ([]byte, error) { return buf.Bytes(), err } +// relayHeader is used to store the end destination of a relayed message +type relayHeader struct { + DestAddr net.UDPAddr +} + +// encodeRelayMessage wraps a message in the messageRelayType, adding the length and +// address of the end recipient to the front of the message +func encodeRelayMessage(t messageType, addr net.UDPAddr, msg interface{}) ([]byte, error) { + buf := bytes.NewBuffer(nil) + handle := codec.MsgpackHandle{} + encoder := codec.NewEncoder(buf, &handle) + + buf.WriteByte(uint8(messageRelayType)) + if err := encoder.Encode(relayHeader{DestAddr: addr}); err != nil { + return nil, err + } + + buf.WriteByte(uint8(t)) + err := encoder.Encode(msg) + return buf.Bytes(), err +} + func encodeFilter(f filterType, filt interface{}) ([]byte, error) { buf := bytes.NewBuffer(nil) buf.WriteByte(uint8(f)) diff --git a/vendor/github.com/hashicorp/serf/serf/query.go b/vendor/github.com/hashicorp/serf/serf/query.go index f29a3b3c5..d758945c1 100644 --- a/vendor/github.com/hashicorp/serf/serf/query.go +++ b/vendor/github.com/hashicorp/serf/serf/query.go @@ -24,6 +24,10 @@ type QueryParam struct { // send an ack. RequestAck bool + // RelayFactor controls the number of duplicate responses to relay + // back to the sender through other nodes for redundancy. + RelayFactor uint8 + // The timeout limits how long the query is left open. If not provided, // then a default timeout is used based on the configuration of Serf Timeout time.Duration @@ -93,6 +97,10 @@ type QueryResponse struct { // respCh is used to send a response from a node respCh chan NodeResponse + // acks/responses are used to track the nodes that have sent an ack/response + acks map[string]struct{} + responses map[string]struct{} + closed bool closeLock sync.Mutex } @@ -100,13 +108,15 @@ type QueryResponse struct { // newQueryResponse is used to construct a new query response func newQueryResponse(n int, q *messageQuery) *QueryResponse { resp := &QueryResponse{ - deadline: time.Now().Add(q.Timeout), - id: q.ID, - lTime: q.LTime, - respCh: make(chan NodeResponse, n), + deadline: time.Now().Add(q.Timeout), + id: q.ID, + lTime: q.LTime, + respCh: make(chan NodeResponse, n), + responses: make(map[string]struct{}), } if q.Ack() { resp.ackCh = make(chan string, n) + resp.acks = make(map[string]struct{}) } return resp } diff --git a/vendor/github.com/hashicorp/serf/serf/serf.go b/vendor/github.com/hashicorp/serf/serf/serf.go index 424da0195..cb21f882c 100644 --- a/vendor/github.com/hashicorp/serf/serf/serf.go +++ b/vendor/github.com/hashicorp/serf/serf/serf.go @@ -25,7 +25,7 @@ import ( // version to memberlist below. const ( ProtocolVersionMin uint8 = 2 - ProtocolVersionMax = 4 + ProtocolVersionMax = 5 ) const ( @@ -499,15 +499,16 @@ func (s *Serf) Query(name string, payload []byte, params *QueryParam) (*QueryRes // Create a message q := messageQuery{ - LTime: s.queryClock.Time(), - ID: uint32(rand.Int31()), - Addr: local.Addr, - Port: local.Port, - Filters: filters, - Flags: flags, - Timeout: params.Timeout, - Name: name, - Payload: payload, + LTime: s.queryClock.Time(), + ID: uint32(rand.Int31()), + Addr: local.Addr, + Port: local.Port, + Filters: filters, + Flags: flags, + RelayFactor: params.RelayFactor, + Timeout: params.Timeout, + Name: name, + Payload: payload, } // Encode the query @@ -1242,14 +1243,15 @@ func (s *Serf) handleQuery(query *messageQuery) bool { if s.config.EventCh != nil { s.config.EventCh <- &Query{ - LTime: query.LTime, - Name: query.Name, - Payload: query.Payload, - serf: s, - id: query.ID, - addr: query.Addr, - port: query.Port, - deadline: time.Now().Add(query.Timeout), + LTime: query.LTime, + Name: query.Name, + Payload: query.Payload, + serf: s, + id: query.ID, + addr: query.Addr, + port: query.Port, + deadline: time.Now().Add(query.Timeout), + relayFactor: query.RelayFactor, } } return rebroadcast @@ -1282,18 +1284,30 @@ func (s *Serf) handleQueryResponse(resp *messageQueryResponse) { // Process each type of response if resp.Ack() { + // Exit early if this is a duplicate ack + if _, ok := query.acks[resp.From]; ok { + return + } + metrics.IncrCounter([]string{"serf", "query_acks"}, 1) select { case query.ackCh <- resp.From: + query.acks[resp.From] = struct{}{} default: - s.logger.Printf("[WARN] serf: Failed to delivery query ack, dropping") + s.logger.Printf("[WARN] serf: Failed to deliver query ack, dropping") } } else { + // Exit early if this is a duplicate response + if _, ok := query.responses[resp.From]; ok { + return + } + metrics.IncrCounter([]string{"serf", "query_responses"}, 1) select { case query.respCh <- NodeResponse{From: resp.From, Payload: resp.Payload}: + query.responses[resp.From] = struct{}{} default: - s.logger.Printf("[WARN] serf: Failed to delivery query response, dropping") + s.logger.Printf("[WARN] serf: Failed to deliver query response, dropping") } } } diff --git a/vendor/vendor.json b/vendor/vendor.json index e1f9a145f..55d0b0bcb 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -625,11 +625,11 @@ "revisionTime": "2016-08-09T01:42:04Z" }, { - "checksumSHA1": "vLyudzMEdik8IpRY1H2vRa2PeLU=", + "checksumSHA1": "EhESUBqb9Kot4rzZu2l/oAJoYCU=", "comment": "v0.7.0-66-g6c4672d", "path": "github.com/hashicorp/serf/serf", - "revision": "114430d8210835d66defdc31cdc176c58e060005", - "revisionTime": "2016-08-09T01:42:04Z" + "revision": "34e94dbd8faa991710b442c22ad6ad37c8b44c3b", + "revisionTime": "2017-02-02T01:56:25Z" }, { "checksumSHA1": "ZhK6IO2XN81Y+3RAjTcVm1Ic7oU=", From 4fc3bd3abf398394a21569d4f77029c2c1511897 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 1 Feb 2017 21:42:41 -0500 Subject: [PATCH 2/2] Added -relay-factor param to keyring operations --- command/agent/keyring.go | 29 +++++++++---- command/agent/keyring_test.go | 16 +++---- command/agent/operator_endpoint.go | 31 ++++++++++--- command/agent/operator_endpoint_test.go | 43 ++++++++++++++++--- command/agent/rpc.go | 19 ++++---- command/agent/rpc_client.go | 17 ++++---- command/agent/rpc_client_test.go | 14 +++--- command/keyring.go | 22 ++++++++-- command/keyring_test.go | 11 +++++ consul/internal_endpoint.go | 9 ++-- consul/structs/structs.go | 9 ++-- .../docs/agent/http/operator.html.markdown | 3 ++ .../docs/commands/keyring.html.markdown | 4 ++ 13 files changed, 162 insertions(+), 65 deletions(-) diff --git a/command/agent/keyring.go b/command/agent/keyring.go index e7b8aa4ce..c870e2dbc 100644 --- a/command/agent/keyring.go +++ b/command/agent/keyring.go @@ -121,31 +121,44 @@ func (a *Agent) keyringProcess(args *structs.KeyringRequest) (*structs.KeyringRe return &reply, nil } +// ParseRelayFactor validates and converts the given relay factor to uint8 +func ParseRelayFactor(n int) (uint8, error) { + if n < 0 || n > 5 { + return 0, fmt.Errorf("Relay factor must be in range: [0, 5]") + } + return uint8(n), nil +} + // ListKeys lists out all keys installed on the collective Consul cluster. This // includes both servers and clients in all DC's. -func (a *Agent) ListKeys(token string) (*structs.KeyringResponses, error) { +func (a *Agent) ListKeys(token string, relayFactor uint8) (*structs.KeyringResponses, error) { args := structs.KeyringRequest{Operation: structs.KeyringList} - args.Token = token + parseKeyringRequest(&args, token, relayFactor) return a.keyringProcess(&args) } // InstallKey installs a new gossip encryption key -func (a *Agent) InstallKey(key, token string) (*structs.KeyringResponses, error) { +func (a *Agent) InstallKey(key, token string, relayFactor uint8) (*structs.KeyringResponses, error) { args := structs.KeyringRequest{Key: key, Operation: structs.KeyringInstall} - args.Token = token + parseKeyringRequest(&args, token, relayFactor) return a.keyringProcess(&args) } // UseKey changes the primary encryption key used to encrypt messages -func (a *Agent) UseKey(key, token string) (*structs.KeyringResponses, error) { +func (a *Agent) UseKey(key, token string, relayFactor uint8) (*structs.KeyringResponses, error) { args := structs.KeyringRequest{Key: key, Operation: structs.KeyringUse} - args.Token = token + parseKeyringRequest(&args, token, relayFactor) return a.keyringProcess(&args) } // RemoveKey will remove a gossip encryption key from the keyring -func (a *Agent) RemoveKey(key, token string) (*structs.KeyringResponses, error) { +func (a *Agent) RemoveKey(key, token string, relayFactor uint8) (*structs.KeyringResponses, error) { args := structs.KeyringRequest{Key: key, Operation: structs.KeyringRemove} - args.Token = token + parseKeyringRequest(&args, token, relayFactor) return a.keyringProcess(&args) } + +func parseKeyringRequest(req *structs.KeyringRequest, token string, relayFactor uint8) { + req.Token = token + req.RelayFactor = relayFactor +} diff --git a/command/agent/keyring_test.go b/command/agent/keyring_test.go index f364b6fa8..aa60a6303 100644 --- a/command/agent/keyring_test.go +++ b/command/agent/keyring_test.go @@ -132,49 +132,49 @@ func TestAgentKeyring_ACL(t *testing.T) { testutil.WaitForLeader(t, agent.RPC, "dc1") // List keys without access fails - _, err := agent.ListKeys("") + _, err := agent.ListKeys("", 0) if err == nil || !strings.Contains(err.Error(), "denied") { t.Fatalf("expected denied error, got: %#v", err) } // List keys with access works - _, err = agent.ListKeys("root") + _, err = agent.ListKeys("root", 0) if err != nil { t.Fatalf("err: %s", err) } // Install without access fails - _, err = agent.InstallKey(key2, "") + _, err = agent.InstallKey(key2, "", 0) if err == nil || !strings.Contains(err.Error(), "denied") { t.Fatalf("expected denied error, got: %#v", err) } // Install with access works - _, err = agent.InstallKey(key2, "root") + _, err = agent.InstallKey(key2, "root", 0) if err != nil { t.Fatalf("err: %s", err) } // Use without access fails - _, err = agent.UseKey(key2, "") + _, err = agent.UseKey(key2, "", 0) if err == nil || !strings.Contains(err.Error(), "denied") { t.Fatalf("expected denied error, got: %#v", err) } // Use with access works - _, err = agent.UseKey(key2, "root") + _, err = agent.UseKey(key2, "root", 0) if err != nil { t.Fatalf("err: %s", err) } // Remove without access fails - _, err = agent.RemoveKey(key1, "") + _, err = agent.RemoveKey(key1, "", 0) if err == nil || !strings.Contains(err.Error(), "denied") { t.Fatalf("expected denied error, got: %#v", err) } // Remove with access works - _, err = agent.RemoveKey(key1, "root") + _, err = agent.RemoveKey(key1, "root", 0) if err != nil { t.Fatalf("err: %s", err) } diff --git a/command/agent/operator_endpoint.go b/command/agent/operator_endpoint.go index ac5377a61..c45f89aa5 100644 --- a/command/agent/operator_endpoint.go +++ b/command/agent/operator_endpoint.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/consul/consul/structs" multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/raft" + "strconv" ) // OperatorRaftConfiguration is used to inspect the current Raft configuration. @@ -59,8 +60,9 @@ func (s *HTTPServer) OperatorRaftPeer(resp http.ResponseWriter, req *http.Reques } type keyringArgs struct { - Key string - Token string + Key string + Token string + RelayFactor uint8 } // OperatorKeyringEndpoint handles keyring operations (install, list, use, remove) @@ -75,6 +77,23 @@ func (s *HTTPServer) OperatorKeyringEndpoint(resp http.ResponseWriter, req *http } s.parseToken(req, &args.Token) + // Parse relay factor + if relayFactor := req.URL.Query().Get("relay-factor"); relayFactor != "" { + n, err := strconv.Atoi(relayFactor) + if err != nil { + resp.WriteHeader(400) + resp.Write([]byte(fmt.Sprintf("Error parsing relay factor: %v", err))) + return nil, nil + } + + args.RelayFactor, err = ParseRelayFactor(n) + if err != nil { + resp.WriteHeader(400) + resp.Write([]byte(fmt.Sprintf("Invalid relay factor: %v", err))) + return nil, nil + } + } + // Switch on the method switch req.Method { case "GET": @@ -93,7 +112,7 @@ func (s *HTTPServer) OperatorKeyringEndpoint(resp http.ResponseWriter, req *http // KeyringInstall is used to install a new gossip encryption key into the cluster func (s *HTTPServer) KeyringInstall(resp http.ResponseWriter, req *http.Request, args *keyringArgs) (interface{}, error) { - responses, err := s.agent.InstallKey(args.Key, args.Token) + responses, err := s.agent.InstallKey(args.Key, args.Token, args.RelayFactor) if err != nil { return nil, err } @@ -103,7 +122,7 @@ func (s *HTTPServer) KeyringInstall(resp http.ResponseWriter, req *http.Request, // KeyringList is used to list the keys installed in the cluster func (s *HTTPServer) KeyringList(resp http.ResponseWriter, req *http.Request, args *keyringArgs) (interface{}, error) { - responses, err := s.agent.ListKeys(args.Token) + responses, err := s.agent.ListKeys(args.Token, args.RelayFactor) if err != nil { return nil, err } @@ -113,7 +132,7 @@ func (s *HTTPServer) KeyringList(resp http.ResponseWriter, req *http.Request, ar // KeyringRemove is used to list the keys installed in the cluster func (s *HTTPServer) KeyringRemove(resp http.ResponseWriter, req *http.Request, args *keyringArgs) (interface{}, error) { - responses, err := s.agent.RemoveKey(args.Key, args.Token) + responses, err := s.agent.RemoveKey(args.Key, args.Token, args.RelayFactor) if err != nil { return nil, err } @@ -123,7 +142,7 @@ func (s *HTTPServer) KeyringRemove(resp http.ResponseWriter, req *http.Request, // KeyringUse is used to change the primary gossip encryption key func (s *HTTPServer) KeyringUse(resp http.ResponseWriter, req *http.Request, args *keyringArgs) (interface{}, error) { - responses, err := s.agent.UseKey(args.Key, args.Token) + responses, err := s.agent.UseKey(args.Key, args.Token, args.RelayFactor) if err != nil { return nil, err } diff --git a/command/agent/operator_endpoint_test.go b/command/agent/operator_endpoint_test.go index 9aff08a4b..4b1dd5412 100644 --- a/command/agent/operator_endpoint_test.go +++ b/command/agent/operator_endpoint_test.go @@ -77,7 +77,7 @@ func TestOperator_KeyringInstall(t *testing.T) { t.Fatalf("err: %s", err) } - listResponse, err := srv.agent.ListKeys("") + listResponse, err := srv.agent.ListKeys("", 0) if err != nil { t.Fatalf("err: %s", err) } @@ -155,13 +155,13 @@ func TestOperator_KeyringRemove(t *testing.T) { c.EncryptKey = key } httpTestWithConfig(t, func(srv *HTTPServer) { - _, err := srv.agent.InstallKey(tempKey, "") + _, err := srv.agent.InstallKey(tempKey, "", 0) if err != nil { t.Fatalf("err: %v", err) } // Make sure the temp key is installed - list, err := srv.agent.ListKeys("") + list, err := srv.agent.ListKeys("", 0) if err != nil { t.Fatalf("err: %v", err) } @@ -191,7 +191,7 @@ func TestOperator_KeyringRemove(t *testing.T) { } // Make sure the temp key has been removed - list, err = srv.agent.ListKeys("") + list, err = srv.agent.ListKeys("", 0) if err != nil { t.Fatalf("err: %v", err) } @@ -217,7 +217,7 @@ func TestOperator_KeyringUse(t *testing.T) { c.EncryptKey = oldKey } httpTestWithConfig(t, func(srv *HTTPServer) { - if _, err := srv.agent.InstallKey(newKey, ""); err != nil { + if _, err := srv.agent.InstallKey(newKey, "", 0); err != nil { t.Fatalf("err: %v", err) } @@ -233,12 +233,12 @@ func TestOperator_KeyringUse(t *testing.T) { t.Fatalf("err: %s", err) } - if _, err := srv.agent.RemoveKey(oldKey, ""); err != nil { + if _, err := srv.agent.RemoveKey(oldKey, "", 0); err != nil { t.Fatalf("err: %v", err) } // Make sure only the new key remains - list, err := srv.agent.ListKeys("") + list, err := srv.agent.ListKeys("", 0) if err != nil { t.Fatalf("err: %v", err) } @@ -256,3 +256,32 @@ func TestOperator_KeyringUse(t *testing.T) { } }, configFunc) } + +func TestOperator_Keyring_InvalidRelayFactor(t *testing.T) { + key := "H3/9gBxcKKRf45CaI2DlRg==" + configFunc := func(c *Config) { + c.EncryptKey = key + } + httpTestWithConfig(t, func(srv *HTTPServer) { + cases := map[string]string{ + "999": "Relay factor must be in range", + "asdf": "Error parsing relay factor", + } + for relayFactor, errString := range cases { + req, err := http.NewRequest("GET", "/v1/operator/keyring?relay-factor="+relayFactor, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + _, err = srv.OperatorKeyringEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + body := resp.Body.String() + if !strings.Contains(body, errString) { + t.Fatalf("bad: %v", body) + } + } + }, configFunc) +} diff --git a/command/agent/rpc.go b/command/agent/rpc.go index bcca2c332..47276d274 100644 --- a/command/agent/rpc.go +++ b/command/agent/rpc.go @@ -106,7 +106,8 @@ type joinResponse struct { } type keyringRequest struct { - Key string + Key string + RelayFactor uint8 } type KeyringEntry struct { @@ -604,21 +605,21 @@ func (i *AgentRPC) handleKeyring(client *rpcClient, seq uint64, cmd, token strin var r keyringResponse var err error - if cmd != listKeysCommand { - if err = client.dec.Decode(&req); err != nil { - return fmt.Errorf("decode failed: %v", err) - } + if err = client.dec.Decode(&req); err != nil { + return fmt.Errorf("decode failed: %v", err) } + i.agent.logger.Printf("[INFO] agent: Sending rpc command with relay factor %d", req.RelayFactor) + switch cmd { case listKeysCommand: - queryResp, err = i.agent.ListKeys(token) + queryResp, err = i.agent.ListKeys(token, req.RelayFactor) case installKeyCommand: - queryResp, err = i.agent.InstallKey(req.Key, token) + queryResp, err = i.agent.InstallKey(req.Key, token, req.RelayFactor) case useKeyCommand: - queryResp, err = i.agent.UseKey(req.Key, token) + queryResp, err = i.agent.UseKey(req.Key, token, req.RelayFactor) case removeKeyCommand: - queryResp, err = i.agent.RemoveKey(req.Key, token) + queryResp, err = i.agent.RemoveKey(req.Key, token, req.RelayFactor) default: respHeader := responseHeader{Seq: seq, Error: unsupportedCommand} client.Send(&respHeader, nil) diff --git a/command/agent/rpc_client.go b/command/agent/rpc_client.go index 744866b41..ecd97aecc 100644 --- a/command/agent/rpc_client.go +++ b/command/agent/rpc_client.go @@ -194,48 +194,49 @@ func (c *RPCClient) WANMembers() ([]Member, error) { return resp.Members, err } -func (c *RPCClient) ListKeys(token string) (keyringResponse, error) { +func (c *RPCClient) ListKeys(token string, relayFactor uint8) (keyringResponse, error) { header := requestHeader{ Command: listKeysCommand, Seq: c.getSeq(), Token: token, } + req := keyringRequest{RelayFactor: relayFactor} var resp keyringResponse - err := c.genericRPC(&header, nil, &resp) + err := c.genericRPC(&header, req, &resp) return resp, err } -func (c *RPCClient) InstallKey(key, token string) (keyringResponse, error) { +func (c *RPCClient) InstallKey(key, token string, relayFactor uint8) (keyringResponse, error) { header := requestHeader{ Command: installKeyCommand, Seq: c.getSeq(), Token: token, } - req := keyringRequest{key} + req := keyringRequest{Key: key, RelayFactor: relayFactor} var resp keyringResponse err := c.genericRPC(&header, &req, &resp) return resp, err } -func (c *RPCClient) UseKey(key, token string) (keyringResponse, error) { +func (c *RPCClient) UseKey(key, token string, relayFactor uint8) (keyringResponse, error) { header := requestHeader{ Command: useKeyCommand, Seq: c.getSeq(), Token: token, } - req := keyringRequest{key} + req := keyringRequest{Key: key, RelayFactor: relayFactor} var resp keyringResponse err := c.genericRPC(&header, &req, &resp) return resp, err } -func (c *RPCClient) RemoveKey(key, token string) (keyringResponse, error) { +func (c *RPCClient) RemoveKey(key, token string, relayFactor uint8) (keyringResponse, error) { header := requestHeader{ Command: removeKeyCommand, Seq: c.getSeq(), Token: token, } - req := keyringRequest{key} + req := keyringRequest{Key: key, RelayFactor: relayFactor} var resp keyringResponse err := c.genericRPC(&header, &req, &resp) return resp, err diff --git a/command/agent/rpc_client_test.go b/command/agent/rpc_client_test.go index a6feb12f5..6ace0bb6f 100644 --- a/command/agent/rpc_client_test.go +++ b/command/agent/rpc_client_test.go @@ -371,7 +371,7 @@ func TestRPCClientInstallKey(t *testing.T) { }) // install key2 - r, err := p1.client.InstallKey(key2, "") + r, err := p1.client.InstallKey(key2, "", 0) if err != nil { t.Fatalf("err: %s", err) } @@ -402,7 +402,7 @@ func TestRPCClientUseKey(t *testing.T) { defer p1.Close() // add a second key to the ring - r, err := p1.client.InstallKey(key2, "") + r, err := p1.client.InstallKey(key2, "", 0) if err != nil { t.Fatalf("err: %s", err) } @@ -423,21 +423,21 @@ func TestRPCClientUseKey(t *testing.T) { }) // can't remove key1 yet - r, err = p1.client.RemoveKey(key1, "") + r, err = p1.client.RemoveKey(key1, "", 0) if err != nil { t.Fatalf("err: %s", err) } keyringError(t, r) // change primary key - r, err = p1.client.UseKey(key2, "") + r, err = p1.client.UseKey(key2, "", 0) if err != nil { t.Fatalf("err: %s", err) } keyringSuccess(t, r) // can remove key1 now - r, err = p1.client.RemoveKey(key1, "") + r, err = p1.client.RemoveKey(key1, "", 0) if err != nil { t.Fatalf("err: %s", err) } @@ -450,7 +450,7 @@ func TestRPCClientKeyOperation_encryptionDisabled(t *testing.T) { }) defer p1.Close() - r, err := p1.client.ListKeys("") + r, err := p1.client.ListKeys("", 0) if err != nil { t.Fatalf("err: %s", err) } @@ -458,7 +458,7 @@ func TestRPCClientKeyOperation_encryptionDisabled(t *testing.T) { } func listKeys(t *testing.T, c *RPCClient) map[string]map[string]int { - resp, err := c.ListKeys("") + resp, err := c.ListKeys("", 0) if err != nil { t.Fatalf("err: %s", err) } diff --git a/command/keyring.go b/command/keyring.go index 3a47cb935..863326f99 100644 --- a/command/keyring.go +++ b/command/keyring.go @@ -18,6 +18,7 @@ type KeyringCommand struct { func (c *KeyringCommand) Run(args []string) int { var installKey, useKey, removeKey, token string var listKeys bool + var relay int cmdFlags := flag.NewFlagSet("keys", flag.ContinueOnError) cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } @@ -27,6 +28,7 @@ func (c *KeyringCommand) Run(args []string) int { cmdFlags.StringVar(&removeKey, "remove", "", "remove key") cmdFlags.BoolVar(&listKeys, "list", false, "list keys") cmdFlags.StringVar(&token, "token", "", "acl token") + cmdFlags.IntVar(&relay, "relay-factor", 0, "relay factor") rpcAddr := RPCAddrFlag(cmdFlags) if err := cmdFlags.Parse(args); err != nil { @@ -56,6 +58,13 @@ func (c *KeyringCommand) Run(args []string) int { return 1 } + // Validate the relay factor + relayFactor, err := agent.ParseRelayFactor(relay) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error parsing relay factor: %s", err)) + return 1 + } + // All other operations will require a client connection client, err := RPCClient(*rpcAddr) if err != nil { @@ -66,7 +75,7 @@ func (c *KeyringCommand) Run(args []string) int { if listKeys { c.Ui.Info("Gathering installed encryption keys...") - r, err := client.ListKeys(token) + r, err := client.ListKeys(token, relayFactor) if err != nil { c.Ui.Error(fmt.Sprintf("error: %s", err)) return 1 @@ -80,7 +89,7 @@ func (c *KeyringCommand) Run(args []string) int { if installKey != "" { c.Ui.Info("Installing new gossip encryption key...") - r, err := client.InstallKey(installKey, token) + r, err := client.InstallKey(installKey, token, relayFactor) if err != nil { c.Ui.Error(fmt.Sprintf("error: %s", err)) return 1 @@ -90,7 +99,7 @@ func (c *KeyringCommand) Run(args []string) int { if useKey != "" { c.Ui.Info("Changing primary gossip encryption key...") - r, err := client.UseKey(useKey, token) + r, err := client.UseKey(useKey, token, relayFactor) if err != nil { c.Ui.Error(fmt.Sprintf("error: %s", err)) return 1 @@ -100,7 +109,7 @@ func (c *KeyringCommand) Run(args []string) int { if removeKey != "" { c.Ui.Info("Removing gossip encryption key...") - r, err := client.RemoveKey(removeKey, token) + r, err := client.RemoveKey(removeKey, token, relayFactor) if err != nil { c.Ui.Error(fmt.Sprintf("error: %s", err)) return 1 @@ -206,6 +215,11 @@ Options: not currently the primary key. -token="" ACL token to use during requests. Defaults to that of the agent. + -relay-factor Added in Consul 0.7.4, setting this to a non-zero + value will cause nodes to relay their response to + the operation through this many randomly-chosen + other nodes in the cluster. The maximum allowed + value is 5. -use= Change the primary encryption key, which is used to encrypt messages. The key must already be installed before this operation can succeed. diff --git a/command/keyring_test.go b/command/keyring_test.go index bb8691ebb..77027c47f 100644 --- a/command/keyring_test.go +++ b/command/keyring_test.go @@ -89,6 +89,17 @@ func TestKeyringCommandRun_failedConnection(t *testing.T) { } } +func TestKeyringCommandRun_invalidRelayFactor(t *testing.T) { + ui := new(cli.MockUi) + c := &KeyringCommand{Ui: ui} + + args := []string{"-list", "-relay-factor=6"} + code := c.Run(args) + if code != 1 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } +} + func listKeys(t *testing.T, addr string) string { ui := new(cli.MockUi) c := &KeyringCommand{Ui: ui} diff --git a/consul/internal_endpoint.go b/consul/internal_endpoint.go index 2d0c05961..2be6de14c 100644 --- a/consul/internal_endpoint.go +++ b/consul/internal_endpoint.go @@ -147,15 +147,16 @@ func (m *Internal) executeKeyringOp( mgr = m.srv.KeyManagerLAN() } + opts := &serf.KeyRequestOptions{RelayFactor: args.RelayFactor} switch args.Operation { case structs.KeyringList: - serfResp, err = mgr.ListKeys() + serfResp, err = mgr.ListKeysWithOptions(opts) case structs.KeyringInstall: - serfResp, err = mgr.InstallKey(args.Key) + serfResp, err = mgr.InstallKeyWithOptions(args.Key, opts) case structs.KeyringUse: - serfResp, err = mgr.UseKey(args.Key) + serfResp, err = mgr.UseKeyWithOptions(args.Key, opts) case structs.KeyringRemove: - serfResp, err = mgr.RemoveKey(args.Key) + serfResp, err = mgr.RemoveKeyWithOptions(args.Key, opts) } errStr := "" diff --git a/consul/structs/structs.go b/consul/structs/structs.go index c4e134eeb..13c67b3d5 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -1000,10 +1000,11 @@ const ( // KeyringRequest encapsulates a request to modify an encryption keyring. // It can be used for install, remove, or use key type operations. type KeyringRequest struct { - Operation KeyringOp - Key string - Datacenter string - Forwarded bool + Operation KeyringOp + Key string + Datacenter string + Forwarded bool + RelayFactor uint8 QueryOptions } diff --git a/website/source/docs/agent/http/operator.html.markdown b/website/source/docs/agent/http/operator.html.markdown index 81525c614..7b6bdd52e 100644 --- a/website/source/docs/agent/http/operator.html.markdown +++ b/website/source/docs/agent/http/operator.html.markdown @@ -138,6 +138,9 @@ Available in Consul 0.7.2 and later, the keyring endpoint supports the This endpoint supports the use of ACL tokens using either the `X-CONSUL-TOKEN` header or the `?token=` query parameter. +Added in Consul 0.7.4, this endpoint supports the `?relay-factor=` query parameter. +See the [Keyring Command](/docs/commands/keyring.html#_relay_factor) for more details. + #### GET Method Using the `GET` method, this endpoint will list the gossip encryption keys diff --git a/website/source/docs/commands/keyring.html.markdown b/website/source/docs/commands/keyring.html.markdown index b2dba7def..b34c3dbfc 100644 --- a/website/source/docs/commands/keyring.html.markdown +++ b/website/source/docs/commands/keyring.html.markdown @@ -48,6 +48,10 @@ The list of available flags are: * `-token=""` - ACL token to use during requests. Defaults to that of the agent. +* `-relay-factor` - Added in Consul 0.7.4, setting this to a non-zero value will + cause nodes to relay their response to the operation through this many + randomly-chosen other nodes in the cluster. The maximum allowed value is 5. + * `-rpc-addr` - Address to the RPC server of the agent you want to contact to send this command. If this isn't specified, the command will contact "127.0.0.1:8400" which is the default RPC address of a Consul agent.