From 08b1fea3798204bb5714c74eba975214e76efcdc Mon Sep 17 00:00:00 2001 From: Hans Hasselberg Date: Mon, 10 Aug 2020 22:24:15 +0200 Subject: [PATCH 1/2] thread local-only through the layers $ consul keyring -list -local-only ==> Gathering installed encryption keys... dc1 (LAN): aUlAW4ST3+vwseI61so24CoORkyjZofcmHk+j7QPSYQ= [1/1] --- agent/keyring.go | 4 +-- agent/keyring_test.go | 4 +-- agent/operator_endpoint.go | 2 +- agent/operator_endpoint_test.go | 46 ++++++++++++++++++++++++++++++--- 4 files changed, 47 insertions(+), 9 deletions(-) diff --git a/agent/keyring.go b/agent/keyring.go index 10817f5bd..48c1ebc0b 100644 --- a/agent/keyring.go +++ b/agent/keyring.go @@ -146,8 +146,8 @@ func ValidateLocalOnly(local bool, list bool) error { // ListKeys lists out all keys installed on the collective Consul cluster. This // includes both servers and clients in all DC's. -func (a *Agent) ListKeys(token string, relayFactor uint8) (*structs.KeyringResponses, error) { - args := structs.KeyringRequest{Operation: structs.KeyringList} +func (a *Agent) ListKeys(token string, localOnly bool, relayFactor uint8) (*structs.KeyringResponses, error) { + args := structs.KeyringRequest{Operation: structs.KeyringList, LocalOnly: localOnly} parseKeyringRequest(&args, token, relayFactor) return a.keyringProcess(&args) } diff --git a/agent/keyring_test.go b/agent/keyring_test.go index be3070454..2b160d4f7 100644 --- a/agent/keyring_test.go +++ b/agent/keyring_test.go @@ -280,13 +280,13 @@ func TestAgentKeyring_ACL(t *testing.T) { defer a.Shutdown() // List keys without access fails - _, err := a.ListKeys("", 0) + _, err := a.ListKeys("", false, 0) if err == nil || !strings.Contains(err.Error(), "denied") { t.Fatalf("expected denied error, got: %#v", err) } // List keys with access works - _, err = a.ListKeys("root", 0) + _, err = a.ListKeys("root", false, 0) if err != nil { t.Fatalf("err: %s", err) } diff --git a/agent/operator_endpoint.go b/agent/operator_endpoint.go index fb3c3a731..badcae55f 100644 --- a/agent/operator_endpoint.go +++ b/agent/operator_endpoint.go @@ -136,7 +136,7 @@ func (s *HTTPServer) KeyringInstall(resp http.ResponseWriter, req *http.Request, // KeyringList is used to list the keys installed in the cluster func (s *HTTPServer) KeyringList(resp http.ResponseWriter, req *http.Request, args *keyringArgs) (interface{}, error) { - responses, err := s.agent.ListKeys(args.Token, args.RelayFactor) + responses, err := s.agent.ListKeys(args.Token, args.LocalOnly, args.RelayFactor) if err != nil { return nil, err } diff --git a/agent/operator_endpoint_test.go b/agent/operator_endpoint_test.go index 71f69634b..b435410d7 100644 --- a/agent/operator_endpoint_test.go +++ b/agent/operator_endpoint_test.go @@ -95,7 +95,7 @@ func TestOperator_KeyringInstall(t *testing.T) { t.Fatalf("err: %s", err) } - listResponse, err := a.ListKeys("", 0) + listResponse, err := a.ListKeys("", false, 0) if err != nil { t.Fatalf("err: %s", err) } @@ -161,6 +161,44 @@ func TestOperator_KeyringList(t *testing.T) { t.Fatalf("bad: %v", ok) } } +func TestOperator_KeyringListLocalOnly(t *testing.T) { + t.Parallel() + key := "H3/9gBxcKKRf45CaI2DlRg==" + a := NewTestAgent(t, ` + encrypt = "`+key+`" + `) + defer a.Shutdown() + + req, _ := http.NewRequest("GET", "/v1/operator/keyring?local-only=1", nil) + resp := httptest.NewRecorder() + r, err := a.srv.OperatorKeyringEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + responses, ok := r.([]*structs.KeyringResponse) + if !ok { + t.Fatalf("err: %v", !ok) + } + + // Check that we only get a LAN response with the original key + if len(responses) != 1 { + for _, r := range responses { + fmt.Println(r) + } + t.Fatalf("bad: %d", len(responses)) + } + + // LAN + if len(responses[0].Keys) != 1 { + t.Fatalf("bad: %d", len(responses[1].Keys)) + } + if responses[0].WAN { + t.Fatalf("bad: %v", responses[1].WAN) + } + if _, ok := responses[0].Keys[key]; !ok { + t.Fatalf("bad: %v", ok) + } +} func TestOperator_KeyringRemove(t *testing.T) { t.Parallel() @@ -177,7 +215,7 @@ func TestOperator_KeyringRemove(t *testing.T) { } // Make sure the temp key is installed - list, err := a.ListKeys("", 0) + list, err := a.ListKeys("", false, 0) if err != nil { t.Fatalf("err: %v", err) } @@ -202,7 +240,7 @@ func TestOperator_KeyringRemove(t *testing.T) { } // Make sure the temp key has been removed - list, err = a.ListKeys("", 0) + list, err = a.ListKeys("", false, 0) if err != nil { t.Fatalf("err: %v", err) } @@ -246,7 +284,7 @@ func TestOperator_KeyringUse(t *testing.T) { } // Make sure only the new key remains - list, err := a.ListKeys("", 0) + list, err := a.ListKeys("", false, 0) if err != nil { t.Fatalf("err: %v", err) } From e0297b6e99f469a0a13f527893690d9051f6dc22 Mon Sep 17 00:00:00 2001 From: Hans Hasselberg Date: Tue, 11 Aug 2020 13:35:48 +0200 Subject: [PATCH 2/2] Refactor keyring ops: * changes some functions to return data instead of modifying pointer arguments * renames globalRPC() to keyringRPCs() to make its purpose more clear * restructures KeyringOperation() to make it more understandable --- agent/consul/internal_endpoint.go | 105 ++++++++++++++++-------------- agent/consul/rpc.go | 29 ++++----- agent/consul/server_test.go | 4 +- agent/router/router.go | 18 +++++ 4 files changed, 89 insertions(+), 67 deletions(-) diff --git a/agent/consul/internal_endpoint.go b/agent/consul/internal_endpoint.go index 720810f0d..c0cefd65a 100644 --- a/agent/consul/internal_endpoint.go +++ b/agent/consul/internal_endpoint.go @@ -250,6 +250,11 @@ func (m *Internal) KeyringOperation( args *structs.KeyringRequest, reply *structs.KeyringResponses) error { + // Error aggressively to be clear about LocalOnly behavior + if args.LocalOnly && args.Operation != structs.KeyringList { + return fmt.Errorf("argument error: LocalOnly can only be used for List operations") + } + // Check ACLs identity, rule, err := m.srv.ResolveTokenToIdentityAndAuthorizer(args.Token) if err != nil { @@ -277,44 +282,63 @@ func (m *Internal) KeyringOperation( } } - // Validate use of local-only - if args.LocalOnly && args.Operation != structs.KeyringList { - // Error aggressively to be clear about LocalOnly behavior - return fmt.Errorf("argument error: LocalOnly can only be used for List operations") - } + if args.LocalOnly || args.Forwarded || m.srv.serfWAN == nil { + // Handle operations that are localOnly, already forwarded or + // there is no serfWAN. If any of this is the case this + // operation shouldn't go out to other dcs or WAN pool. + reply.Responses = append(reply.Responses, m.executeKeyringOpLAN(args)...) + } else { + // Handle not already forwarded, non-local operations. - // args.LocalOnly should always be false for non-GET requests - if !args.LocalOnly { - // Only perform WAN keyring querying and RPC forwarding once - if !args.Forwarded && m.srv.serfWAN != nil { - args.Forwarded = true - m.executeKeyringOp(args, reply, true) - return m.srv.globalRPC("Internal.KeyringOperation", args, reply) + // Marking this as forwarded because this is what we are about + // to do. Prevents the same message from being fowarded by + // other servers. + args.Forwarded = true + reply.Responses = append(reply.Responses, m.executeKeyringOpWAN(args)) + reply.Responses = append(reply.Responses, m.executeKeyringOpLAN(args)...) + + dcs := m.srv.router.GetRemoteDatacenters(m.srv.config.Datacenter) + responses, err := m.srv.keyringRPCs("Internal.KeyringOperation", args, dcs) + if err != nil { + return err } + reply.Add(responses) } - - // Query the LAN keyring of this node's DC - m.executeKeyringOp(args, reply, false) return nil } -// executeKeyringOp executes the keyring-related operation in the request -// on either the WAN or LAN pools. -func (m *Internal) executeKeyringOp( - args *structs.KeyringRequest, - reply *structs.KeyringResponses, - wan bool) { - - if wan { - mgr := m.srv.KeyManagerWAN() - m.executeKeyringOpMgr(mgr, args, reply, wan, "") - } else { - segments := m.srv.LANSegments() - for name, segment := range segments { - mgr := segment.KeyManager() - m.executeKeyringOpMgr(mgr, args, reply, wan, name) - } +func (m *Internal) executeKeyringOpLAN(args *structs.KeyringRequest) []*structs.KeyringResponse { + responses := []*structs.KeyringResponse{} + segments := m.srv.LANSegments() + for name, segment := range segments { + mgr := segment.KeyManager() + serfResp, err := m.executeKeyringOpMgr(mgr, args) + resp := translateKeyResponseToKeyringResponse(serfResp, m.srv.config.Datacenter, err) + resp.Segment = name + responses = append(responses, &resp) } + return responses +} + +func (m *Internal) executeKeyringOpWAN(args *structs.KeyringRequest) *structs.KeyringResponse { + mgr := m.srv.KeyManagerWAN() + serfResp, err := m.executeKeyringOpMgr(mgr, args) + resp := translateKeyResponseToKeyringResponse(serfResp, m.srv.config.Datacenter, err) + resp.WAN = true + return &resp +} + +func translateKeyResponseToKeyringResponse(keyresponse *serf.KeyResponse, datacenter string, err error) structs.KeyringResponse { + resp := structs.KeyringResponse{ + Datacenter: datacenter, + Messages: keyresponse.Messages, + Keys: keyresponse.Keys, + NumNodes: keyresponse.NumNodes, + } + if err != nil { + resp.Error = err.Error() + } + return resp } // executeKeyringOpMgr executes the appropriate keyring-related function based on @@ -323,9 +347,7 @@ func (m *Internal) executeKeyringOp( func (m *Internal) executeKeyringOpMgr( mgr *serf.KeyManager, args *structs.KeyringRequest, - reply *structs.KeyringResponses, - wan bool, - segment string) { +) (*serf.KeyResponse, error) { var serfResp *serf.KeyResponse var err error @@ -341,20 +363,7 @@ func (m *Internal) executeKeyringOpMgr( serfResp, err = mgr.RemoveKeyWithOptions(args.Key, opts) } - errStr := "" - if err != nil { - errStr = err.Error() - } - - reply.Responses = append(reply.Responses, &structs.KeyringResponse{ - WAN: wan, - Datacenter: m.srv.config.Datacenter, - Segment: segment, - Messages: serfResp.Messages, - Keys: serfResp.Keys, - NumNodes: serfResp.NumNodes, - Error: errStr, - }) + return serfResp, err } // aclAccessorID is used to convert an ACLToken's secretID to its accessorID for non- diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 4ab54a8b9..0a520dcee 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -635,22 +635,17 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{ return nil } -// globalRPC is used to forward an RPC request to one server in each datacenter. -// This will only error for RPC-related errors. Otherwise, application-level -// errors can be sent in the response objects. -func (s *Server) globalRPC(method string, args interface{}, - reply structs.CompoundResponse) error { +// keyringRPCs is used to forward an RPC request to a server in each dc. This +// will only error for RPC-related errors. Otherwise, application-level errors +// can be sent in the response objects. +func (s *Server) keyringRPCs(method string, args interface{}, dcs []string) (*structs.KeyringResponses, error) { - // Make a new request into each datacenter - dcs := s.router.GetDatacenters() - - replies, total := 0, len(dcs) - errorCh := make(chan error, total) - respCh := make(chan interface{}, total) + errorCh := make(chan error, len(dcs)) + respCh := make(chan *structs.KeyringResponses, len(dcs)) for _, dc := range dcs { go func(dc string) { - rr := reply.New() + rr := &structs.KeyringResponses{} if err := s.forwardDC(method, dc, args, &rr); err != nil { errorCh <- err return @@ -659,16 +654,16 @@ func (s *Server) globalRPC(method string, args interface{}, }(dc) } - for replies < total { + responses := &structs.KeyringResponses{} + for i := 0; i < len(dcs); i++ { select { case err := <-errorCh: - return err + return nil, err case rr := <-respCh: - reply.Add(rr) - replies++ + responses.Add(rr) } } - return nil + return responses, nil } type raftEncoder func(structs.MessageType, interface{}) ([]byte, error) diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index 18d61618f..a6516b87d 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -1286,7 +1286,7 @@ func (r *fakeGlobalResp) New() interface{} { return struct{}{} } -func TestServer_globalRPCErrors(t *testing.T) { +func TestServer_keyringRPCs(t *testing.T) { t.Parallel() dir1, s1 := testServerDC(t, "dc1") defer os.RemoveAll(dir1) @@ -1298,7 +1298,7 @@ func TestServer_globalRPCErrors(t *testing.T) { }) // Check that an error from a remote DC is returned - err := s1.globalRPC("Bad.Method", nil, &fakeGlobalResp{}) + _, err := s1.keyringRPCs("Bad.Method", nil, []string{s1.config.Datacenter}) if err == nil { t.Fatalf("should have errored") } diff --git a/agent/router/router.go b/agent/router/router.go index 64df6a003..f8fa52ff3 100644 --- a/agent/router/router.go +++ b/agent/router/router.go @@ -406,6 +406,24 @@ func (r *Router) GetDatacenters() []string { return dcs } +// GetRemoteDatacenters returns a list of remote datacenters known to the router, sorted by +// name. +func (r *Router) GetRemoteDatacenters(local string) []string { + r.RLock() + defer r.RUnlock() + + dcs := make([]string, 0, len(r.managers)) + for dc := range r.managers { + if dc == local { + continue + } + dcs = append(dcs, dc) + } + + sort.Strings(dcs) + return dcs +} + // HasDatacenter checks whether dc is defined in WAN func (r *Router) HasDatacenter(dc string) bool { r.RLock()