From f6b5fc8c087257ac6e7a02c8529809a3f2df5d53 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Sun, 28 Sep 2014 12:35:51 -0700 Subject: [PATCH] consul: cross-dc key rotation works --- command/agent/rpc.go | 80 +++++++++++++++------- command/agent/rpc_client.go | 32 ++++----- command/keyring.go | 131 +++++++++++++++++++++++++----------- consul/internal_endpoint.go | 98 +++++++++++++++------------ consul/rpc.go | 13 ---- consul/structs/structs.go | 3 + 6 files changed, 222 insertions(+), 135 deletions(-) diff --git a/command/agent/rpc.go b/command/agent/rpc.go index 5d6c5c8a7..288a97cf3 100644 --- a/command/agent/rpc.go +++ b/command/agent/rpc.go @@ -113,12 +113,33 @@ type keyRequest struct { Key string } +type KeyringEntry struct { + Datacenter string + Pool string + Key string + Count int +} + +type KeyringMessage struct { + Datacenter string + Pool string + Node string + Message string +} + +type KeyringInfo struct { + Datacenter string + Pool string + NumNodes int + NumResp int + NumErr int + Error string +} + type keyResponse struct { - Messages map[string]string - Keys map[string]int - NumNodes int - NumResp int - NumErr int + Keys []KeyringEntry + Messages []KeyringMessage + Info []KeyringInfo } type membersResponse struct { @@ -607,7 +628,7 @@ func (i *AgentRPC) handleReload(client *rpcClient, seq uint64) error { func (i *AgentRPC) handleKeyring(client *rpcClient, seq uint64, cmd string) error { var req keyRequest var queryResp *structs.KeyringResponses - var resp keyResponse + var r keyResponse var err error if cmd != listKeysCommand { @@ -636,30 +657,43 @@ func (i *AgentRPC) handleKeyring(client *rpcClient, seq uint64, cmd string) erro Error: errToString(err), } - if resp.Messages == nil { - resp.Messages = make(map[string]string) - } - if resp.Keys == nil { - resp.Keys = make(map[string]int) - } - for _, kr := range queryResp.Responses { - for node, msg := range kr.Messages { - resp.Messages[node+"."+kr.Datacenter] = msg + var pool string + if kr.WAN { + pool = "WAN" + } else { + pool = "LAN" + } + for node, message := range kr.Messages { + msg := KeyringMessage{ + Datacenter: kr.Datacenter, + Pool: pool, + Node: node, + Message: message, + } + r.Messages = append(r.Messages, msg) } for key, qty := range kr.Keys { - if _, ok := resp.Keys[key]; ok { - resp.Keys[key] += qty - } else { - resp.Keys[key] = qty + k := KeyringEntry{ + Datacenter: kr.Datacenter, + Pool: pool, + Key: key, + Count: qty, } + r.Keys = append(r.Keys, k) } - resp.NumNodes += kr.NumNodes - resp.NumResp += kr.NumResp - resp.NumErr += kr.NumErr + info := KeyringInfo{ + Datacenter: kr.Datacenter, + Pool: pool, + NumNodes: kr.NumNodes, + NumResp: kr.NumResp, + NumErr: kr.NumErr, + Error: kr.Error, + } + r.Info = append(r.Info, info) } - return client.Send(&header, resp) + return client.Send(&header, r) } // Used to convert an error to a string representation diff --git a/command/agent/rpc_client.go b/command/agent/rpc_client.go index 36a54057e..454f427a8 100644 --- a/command/agent/rpc_client.go +++ b/command/agent/rpc_client.go @@ -176,47 +176,47 @@ func (c *RPCClient) WANMembers() ([]Member, error) { return resp.Members, err } -func (c *RPCClient) ListKeys() (map[string]int, int, map[string]string, error) { +func (c *RPCClient) ListKeys() (keyResponse, error) { header := requestHeader{ Command: listKeysCommand, Seq: c.getSeq(), } - resp := new(keyResponse) - err := c.genericRPC(&header, nil, resp) - return resp.Keys, resp.NumNodes, resp.Messages, err + var resp keyResponse + err := c.genericRPC(&header, nil, &resp) + return resp, err } -func (c *RPCClient) InstallKey(key string) (map[string]string, error) { +func (c *RPCClient) InstallKey(key string) (keyResponse, error) { header := requestHeader{ Command: installKeyCommand, Seq: c.getSeq(), } req := keyRequest{key} - resp := new(keyResponse) - err := c.genericRPC(&header, &req, resp) - return resp.Messages, err + var resp keyResponse + err := c.genericRPC(&header, &req, &resp) + return resp, err } -func (c *RPCClient) UseKey(key string) (map[string]string, error) { +func (c *RPCClient) UseKey(key string) (keyResponse, error) { header := requestHeader{ Command: useKeyCommand, Seq: c.getSeq(), } req := keyRequest{key} - resp := new(keyResponse) - err := c.genericRPC(&header, &req, resp) - return resp.Messages, err + var resp keyResponse + err := c.genericRPC(&header, &req, &resp) + return resp, err } -func (c *RPCClient) RemoveKey(key string) (map[string]string, error) { +func (c *RPCClient) RemoveKey(key string) (keyResponse, error) { header := requestHeader{ Command: removeKeyCommand, Seq: c.getSeq(), } req := keyRequest{key} - resp := new(keyResponse) - err := c.genericRPC(&header, &req, resp) - return resp.Messages, err + var resp keyResponse + err := c.genericRPC(&header, &req, &resp) + return resp, err } // Leave is used to trigger a graceful leave and shutdown diff --git a/command/keyring.go b/command/keyring.go index eaa298dcd..8d0dc9545 100644 --- a/command/keyring.go +++ b/command/keyring.go @@ -14,6 +14,12 @@ import ( "github.com/ryanuber/columnize" ) +const ( + installKeyCommand = "install" + useKeyCommand = "use" + removeKeyCommand = "remove" +) + // KeyringCommand is a Command implementation that handles querying, installing, // and removing gossip encryption keys from a keyring. type KeyringCommand struct { @@ -107,79 +113,106 @@ func (c *KeyringCommand) Run(args []string) int { if listKeys { c.Ui.Info("Asking all members for installed keys...") - return c.listKeysOperation(client.ListKeys) + return c.listKeysOperation(client) } if installKey != "" { c.Ui.Info("Installing new gossip encryption key...") - if rval := c.keyOperation(installKey, client.InstallKey); rval != 0 { - return rval + r, err := client.InstallKey(installKey) + if err != nil { + c.Ui.Error(fmt.Sprintf("error: %s", err)) + return 1 } - c.Ui.Info("Successfully installed key!") - return 0 + rval := c.handleResponse(r.Info, r.Messages, r.Keys) + if rval == 0 { + c.Ui.Info("Successfully installed new key!") + } + return rval } if useKey != "" { c.Ui.Info("Changing primary gossip encryption key...") - if rval := c.keyOperation(useKey, client.UseKey); rval != 0 { - return rval + r, err := client.UseKey(useKey) + if err != nil { + c.Ui.Error(fmt.Sprintf("error: %s", err)) + return 1 } - c.Ui.Info("Successfully changed primary key!") - return 0 + rval := c.handleResponse(r.Info, r.Messages, r.Keys) + if rval == 0 { + c.Ui.Info("Successfully changed primary encryption key!") + } + return rval } if removeKey != "" { c.Ui.Info("Removing gossip encryption key...") - if rval := c.keyOperation(removeKey, client.RemoveKey); rval != 0 { - return rval + r, err := client.RemoveKey(removeKey) + if err != nil { + c.Ui.Error(fmt.Sprintf("error: %s", err)) + return 1 } - c.Ui.Info("Successfully removed gossip encryption key!") - return 0 + rval := c.handleResponse(r.Info, r.Messages, r.Keys) + if rval == 0 { + c.Ui.Info("Successfully removed encryption key!") + } + return rval } // Should never make it here return 0 } -// keyFunc is a function which manipulates gossip encryption keyrings. This is -// used for key installation, removal, and primary key changes. -type keyFunc func(string) (map[string]string, error) +func (c *KeyringCommand) handleResponse( + info []agent.KeyringInfo, + messages []agent.KeyringMessage, + entries []agent.KeyringEntry) int { -// keyOperation is a unified process for manipulating the gossip keyrings. -func (c *KeyringCommand) keyOperation(key string, fn keyFunc) int { - var out []string + var rval int - failures, err := fn(key) - - if err != nil { - if len(failures) > 0 { - for node, msg := range failures { - out = append(out, fmt.Sprintf("failed: %s | %s", node, msg)) + for _, i := range info { + if i.Error != "" { + pool := i.Pool + if pool != "WAN" { + pool = i.Datacenter + " (LAN)" } - c.Ui.Error(columnize.SimpleFormat(out)) + + c.Ui.Error("") + c.Ui.Error(fmt.Sprintf("%s error: %s", pool, i.Error)) + + var errors []string + for _, msg := range messages { + if msg.Datacenter != i.Datacenter || msg.Pool != i.Pool { + continue + } + errors = append(errors, fmt.Sprintf( + "failed: %s | %s", + msg.Node, + msg.Message)) + } + c.Ui.Error(columnize.SimpleFormat(errors)) + rval = 1 } - c.Ui.Error("") - c.Ui.Error(fmt.Sprintf("Error: %s", err)) - return 1 } - return 0 + return rval } -// listKeysFunc is a function which handles querying lists of gossip keys -type listKeysFunc func() (map[string]int, int, map[string]string, error) - // listKeysOperation is a unified process for querying and // displaying gossip keys. -func (c *KeyringCommand) listKeysOperation(fn listKeysFunc) int { +func (c *KeyringCommand) listKeysOperation(client *agent.RPCClient) int { var out []string - keys, numNodes, failures, err := fn() + resp, err := client.ListKeys() if err != nil { - if len(failures) > 0 { - for node, msg := range failures { - out = append(out, fmt.Sprintf("failed: %s | %s", node, msg)) + if len(resp.Messages) > 0 { + for _, msg := range resp.Messages { + out = append(out, fmt.Sprintf( + "failed: %s | %s | %s | %s", + msg.Datacenter, + msg.Pool, + msg.Node, + msg.Message)) } c.Ui.Error(columnize.SimpleFormat(out)) } @@ -187,8 +220,26 @@ func (c *KeyringCommand) listKeysOperation(fn listKeysFunc) int { c.Ui.Error(fmt.Sprintf("Failed gathering member keys: %s", err)) return 1 } - for key, num := range keys { - out = append(out, fmt.Sprintf("%s | [%d/%d]", key, num, numNodes)) + + entries := make(map[string]map[string]int) + for _, key := range resp.Keys { + var dc string + if key.Pool == "WAN" { + dc = key.Pool + } else { + dc = key.Datacenter + } + if _, ok := entries[dc]; !ok { + entries[dc] = make(map[string]int) + } + entries[dc][key.Key] = key.Count + } + for dc, keys := range entries { + out = append(out, "") + out = append(out, dc) + for key, count := range keys { + out = append(out, fmt.Sprintf("%s|[%d/%d]", key, count, count)) + } } c.Ui.Output(columnize.SimpleFormat(out)) diff --git a/consul/internal_endpoint.go b/consul/internal_endpoint.go index 690707611..2cd08a2c6 100644 --- a/consul/internal_endpoint.go +++ b/consul/internal_endpoint.go @@ -64,42 +64,69 @@ func (m *Internal) EventFire(args *structs.EventFireRequest, return m.srv.UserEvent(args.Name, args.Payload) } +// ingestKeyringResponse is a helper method to pick the relative information +// from a Serf message and stuff it into a KeyringResponse. func (m *Internal) ingestKeyringResponse( - resp *serf.KeyResponse, - reply *structs.KeyringResponses) { + serfResp *serf.KeyResponse, + reply *structs.KeyringResponses, + err error, wan bool) { + + errStr := "" + if err != nil { + errStr = err.Error() + } reply.Responses = append(reply.Responses, &structs.KeyringResponse{ + WAN: wan, Datacenter: m.srv.config.Datacenter, - Messages: resp.Messages, - Keys: resp.Keys, - NumResp: resp.NumResp, - NumNodes: resp.NumNodes, - NumErr: resp.NumErr, + Messages: serfResp.Messages, + Keys: serfResp.Keys, + NumResp: serfResp.NumResp, + NumNodes: serfResp.NumNodes, + NumErr: serfResp.NumErr, + Error: errStr, }) } +func (m *Internal) forwardKeyring( + method string, + args *structs.KeyringRequest, + replies *structs.KeyringResponses) error { + + for dc, _ := range m.srv.remoteConsuls { + if dc == m.srv.config.Datacenter { + continue + } + rr := structs.KeyringResponses{} + if err := m.srv.forwardDC(method, dc, args, &rr); err != nil { + return err + } + for _, r := range rr.Responses { + replies.Responses = append(replies.Responses, r) + } + } + + return nil +} + // ListKeys will query the WAN and LAN gossip keyrings of all nodes, adding // results into a collective response as we go. func (m *Internal) ListKeys( args *structs.KeyringRequest, reply *structs.KeyringResponses) error { + m.srv.setQueryMeta(&reply.QueryMeta) + respLAN, err := m.srv.KeyManagerLAN().ListKeys() - if err != nil { - return err - } - m.ingestKeyringResponse(respLAN, reply) + m.ingestKeyringResponse(respLAN, reply, err, false) if !args.Forwarded { respWAN, err := m.srv.KeyManagerWAN().ListKeys() - if err != nil { - return err - } - m.ingestKeyringResponse(respWAN, reply) + m.ingestKeyringResponse(respWAN, reply, err, true) // Mark key rotation as being already forwarded, then forward. args.Forwarded = true - return m.srv.forwardAll("Internal.ListKeys", args, reply) + m.forwardKeyring("Internal.ListKeys", args, reply) } return nil @@ -112,20 +139,15 @@ func (m *Internal) InstallKey( reply *structs.KeyringResponses) error { respLAN, err := m.srv.KeyManagerLAN().InstallKey(args.Key) - if err != nil { - return err - } - m.ingestKeyringResponse(respLAN, reply) + m.ingestKeyringResponse(respLAN, reply, err, false) if !args.Forwarded { respWAN, err := m.srv.KeyManagerWAN().InstallKey(args.Key) - if err != nil { - return err - } - m.ingestKeyringResponse(respWAN, reply) + m.ingestKeyringResponse(respWAN, reply, err, true) + // Mark key rotation as being already forwarded, then forward. args.Forwarded = true - return m.srv.forwardAll("Internal.InstallKey", args, reply) + m.forwardKeyring("Internal.InstallKey", args, reply) } return nil @@ -138,20 +160,15 @@ func (m *Internal) UseKey( reply *structs.KeyringResponses) error { respLAN, err := m.srv.KeyManagerLAN().UseKey(args.Key) - if err != nil { - return err - } - m.ingestKeyringResponse(respLAN, reply) + m.ingestKeyringResponse(respLAN, reply, err, false) if !args.Forwarded { respWAN, err := m.srv.KeyManagerWAN().UseKey(args.Key) - if err != nil { - return err - } - m.ingestKeyringResponse(respWAN, reply) + m.ingestKeyringResponse(respWAN, reply, err, true) + // Mark key rotation as being already forwarded, then forward. args.Forwarded = true - return m.srv.forwardAll("Internal.UseKey", args, reply) + m.forwardKeyring("Internal.UseKey", args, reply) } return nil @@ -163,20 +180,15 @@ func (m *Internal) RemoveKey( reply *structs.KeyringResponses) error { respLAN, err := m.srv.KeyManagerLAN().RemoveKey(args.Key) - if err != nil { - return err - } - m.ingestKeyringResponse(respLAN, reply) + m.ingestKeyringResponse(respLAN, reply, err, false) if !args.Forwarded { respWAN, err := m.srv.KeyManagerWAN().RemoveKey(args.Key) - if err != nil { - return err - } - m.ingestKeyringResponse(respWAN, reply) + m.ingestKeyringResponse(respWAN, reply, err, true) + // Mark key rotation as being already forwarded, then forward. args.Forwarded = true - return m.srv.forwardAll("Internal.RemoveKey", args, reply) + m.forwardKeyring("Internal.RemoveKey", args, reply) } return nil diff --git a/consul/rpc.go b/consul/rpc.go index e5b5be6c5..cd5c36ebd 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -223,19 +223,6 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{ return s.connPool.RPC(server.Addr, server.Version, method, args, reply) } -// forwardAll forwards a single RPC call to every known datacenter. -func (s *Server) forwardAll(method string, args, reply interface{}) error { - for dc, _ := range s.remoteConsuls { - if dc != s.config.Datacenter { - // Forward the RPC call. Even if an error is returned here, we still - // want to continue broadcasting to the remaining DC's to avoid - // network partitions completely killing us. - go s.forwardDC(method, dc, args, reply) - } - } - return nil -} - // raftApply is used to encode a message, run it through raft, and return // the FSM response along with any errors func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, error) { diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 3f029ee81..6e752ea4f 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -543,14 +543,17 @@ type KeyringRequest struct { // KeyringResponse is a unified key response and can be used for install, // remove, use, as well as listing key queries. type KeyringResponse struct { + WAN bool Datacenter string Messages map[string]string Keys map[string]int NumNodes int NumResp int NumErr int + Error string } type KeyringResponses struct { Responses []*KeyringResponse + QueryMeta }