[sync oss] api: add peering api module (#12911)
This commit is contained in:
parent
8dc68002f9
commit
4cd68b4534
|
@ -105,7 +105,7 @@ func init() {
|
||||||
registerEndpoint("/v1/operator/autopilot/state", []string{"GET"}, (*HTTPHandlers).OperatorAutopilotState)
|
registerEndpoint("/v1/operator/autopilot/state", []string{"GET"}, (*HTTPHandlers).OperatorAutopilotState)
|
||||||
registerEndpoint("/v1/peering/token", []string{"POST"}, (*HTTPHandlers).PeeringGenerateToken)
|
registerEndpoint("/v1/peering/token", []string{"POST"}, (*HTTPHandlers).PeeringGenerateToken)
|
||||||
registerEndpoint("/v1/peering/initiate", []string{"POST"}, (*HTTPHandlers).PeeringInitiate)
|
registerEndpoint("/v1/peering/initiate", []string{"POST"}, (*HTTPHandlers).PeeringInitiate)
|
||||||
registerEndpoint("/v1/peering/", []string{"GET"}, (*HTTPHandlers).PeeringRead)
|
registerEndpoint("/v1/peering/", []string{"GET", "DELETE"}, (*HTTPHandlers).PeeringEndpoint)
|
||||||
registerEndpoint("/v1/peerings", []string{"GET"}, (*HTTPHandlers).PeeringList)
|
registerEndpoint("/v1/peerings", []string{"GET"}, (*HTTPHandlers).PeeringList)
|
||||||
registerEndpoint("/v1/query", []string{"GET", "POST"}, (*HTTPHandlers).PreparedQueryGeneral)
|
registerEndpoint("/v1/query", []string{"GET", "POST"}, (*HTTPHandlers).PreparedQueryGeneral)
|
||||||
// specific prepared query endpoints have more complex rules for allowed methods, so
|
// specific prepared query endpoints have more complex rules for allowed methods, so
|
||||||
|
|
|
@ -8,8 +8,8 @@ import (
|
||||||
"github.com/hashicorp/consul/proto/pbpeering"
|
"github.com/hashicorp/consul/proto/pbpeering"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PeeringRead fetches a peering that matches the request parameters.
|
// PeeringEndpoint handles GET, DELETE on v1/peering/name
|
||||||
func (s *HTTPHandlers) PeeringRead(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
func (s *HTTPHandlers) PeeringEndpoint(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||||
name, err := getPathSuffixUnescaped(req.URL.Path, "/v1/peering/")
|
name, err := getPathSuffixUnescaped(req.URL.Path, "/v1/peering/")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -23,10 +23,24 @@ func (s *HTTPHandlers) PeeringRead(resp http.ResponseWriter, req *http.Request)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Switch on the method
|
||||||
|
switch req.Method {
|
||||||
|
case "GET":
|
||||||
|
return s.peeringRead(resp, req, name, entMeta.PartitionOrEmpty())
|
||||||
|
case "DELETE":
|
||||||
|
return s.peeringDelete(resp, req, name, entMeta.PartitionOrEmpty())
|
||||||
|
default:
|
||||||
|
return nil, MethodNotAllowedError{req.Method, []string{"GET", "DELETE"}}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// peeringRead fetches a peering that matches the name and partition.
|
||||||
|
// This assumes that the name and partition parameters are valid
|
||||||
|
func (s *HTTPHandlers) peeringRead(resp http.ResponseWriter, req *http.Request, name, partition string) (interface{}, error) {
|
||||||
args := pbpeering.PeeringReadRequest{
|
args := pbpeering.PeeringReadRequest{
|
||||||
Name: name,
|
Name: name,
|
||||||
Datacenter: s.agent.config.Datacenter,
|
Datacenter: s.agent.config.Datacenter,
|
||||||
Partition: entMeta.PartitionOrEmpty(), // should be "" in OSS
|
Partition: partition, // should be "" in OSS
|
||||||
}
|
}
|
||||||
|
|
||||||
result, err := s.agent.rpcClientPeering.PeeringRead(req.Context(), &args)
|
result, err := s.agent.rpcClientPeering.PeeringRead(req.Context(), &args)
|
||||||
|
@ -116,3 +130,21 @@ func (s *HTTPHandlers) PeeringInitiate(resp http.ResponseWriter, req *http.Reque
|
||||||
|
|
||||||
return s.agent.rpcClientPeering.Initiate(req.Context(), &args)
|
return s.agent.rpcClientPeering.Initiate(req.Context(), &args)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// peeringDelete initiates a deletion for a peering that matches the name and partition.
|
||||||
|
// This assumes that the name and partition parameters are valid.
|
||||||
|
func (s *HTTPHandlers) peeringDelete(resp http.ResponseWriter, req *http.Request, name, partition string) (interface{}, error) {
|
||||||
|
args := pbpeering.PeeringDeleteRequest{
|
||||||
|
Name: name,
|
||||||
|
Datacenter: s.agent.config.Datacenter,
|
||||||
|
Partition: partition, // should be "" in OSS
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := s.agent.rpcClientPeering.PeeringDelete(req.Context(), &args)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(peering) -- today pbpeering.PeeringDeleteResponse is a {} so the result below is actually {}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
|
@ -43,3 +43,34 @@ func TestHTTP_Peering_GenerateToken_OSS_Failure(t *testing.T) {
|
||||||
require.Contains(t, string(body), "Partitions are a Consul Enterprise feature")
|
require.Contains(t, string(body), "Partitions are a Consul Enterprise feature")
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHTTP_PeeringEndpoint_OSS_Failure(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("too slow for testing.Short")
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
a := NewTestAgent(t, "")
|
||||||
|
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
||||||
|
|
||||||
|
t.Run("Doesn't allow partitions on PeeringEndpoint in OSS HTTP requests", func(t *testing.T) {
|
||||||
|
req, err := http.NewRequest("GET", "/v1/peering/foo?partition=foo", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
a.srv.h.ServeHTTP(resp, req)
|
||||||
|
|
||||||
|
require.Equal(t, http.StatusBadRequest, resp.Code)
|
||||||
|
body, _ := io.ReadAll(resp.Body)
|
||||||
|
require.Contains(t, string(body), "Partitions are a Consul Enterprise feature")
|
||||||
|
|
||||||
|
req2, err2 := http.NewRequest("DELETE", "/v1/peering/foo?partition=foo", nil)
|
||||||
|
require.NoError(t, err2)
|
||||||
|
resp2 := httptest.NewRecorder()
|
||||||
|
a.srv.h.ServeHTTP(resp2, req2)
|
||||||
|
|
||||||
|
require.Equal(t, http.StatusBadRequest, resp2.Code)
|
||||||
|
body2, _ := io.ReadAll(resp2.Body)
|
||||||
|
require.Contains(t, string(body2), "Partitions are a Consul Enterprise feature")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -195,6 +195,41 @@ func TestHTTP_Peering_Initiate(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHTTP_Peering_MethodNotAllowed(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("too slow for testing.Short")
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Parallel()
|
||||||
|
a := NewTestAgent(t, "")
|
||||||
|
|
||||||
|
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Insert peerings directly to state store.
|
||||||
|
// Note that the state store holds reference to the underlying
|
||||||
|
// variables; do not modify them after writing.
|
||||||
|
foo := &pbpeering.PeeringWriteRequest{
|
||||||
|
Peering: &pbpeering.Peering{
|
||||||
|
Name: "foo",
|
||||||
|
State: pbpeering.PeeringState_INITIAL,
|
||||||
|
PeerCAPems: nil,
|
||||||
|
PeerServerName: "fooservername",
|
||||||
|
PeerServerAddresses: []string{"addr1"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
_, err := a.rpcClientPeering.PeeringWrite(ctx, foo)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
req, err := http.NewRequest("PUT", "/v1/peering/foo", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
a.srv.h.ServeHTTP(resp, req)
|
||||||
|
require.Equal(t, http.StatusMethodNotAllowed, resp.Code)
|
||||||
|
}
|
||||||
|
|
||||||
func TestHTTP_Peering_Read(t *testing.T) {
|
func TestHTTP_Peering_Read(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("too slow for testing.Short")
|
t.Skip("too slow for testing.Short")
|
||||||
|
@ -257,6 +292,76 @@ func TestHTTP_Peering_Read(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHTTP_Peering_Delete(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("too slow for testing.Short")
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Parallel()
|
||||||
|
a := NewTestAgent(t, "")
|
||||||
|
|
||||||
|
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Insert peerings directly to state store.
|
||||||
|
// Note that the state store holds reference to the underlying
|
||||||
|
// variables; do not modify them after writing.
|
||||||
|
foo := &pbpeering.PeeringWriteRequest{
|
||||||
|
Peering: &pbpeering.Peering{
|
||||||
|
Name: "foo",
|
||||||
|
State: pbpeering.PeeringState_INITIAL,
|
||||||
|
PeerCAPems: nil,
|
||||||
|
PeerServerName: "fooservername",
|
||||||
|
PeerServerAddresses: []string{"addr1"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
_, err := a.rpcClientPeering.PeeringWrite(ctx, foo)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
t.Run("read existing token before attempting delete", func(t *testing.T) {
|
||||||
|
req, err := http.NewRequest("GET", "/v1/peering/foo", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
a.srv.h.ServeHTTP(resp, req)
|
||||||
|
require.Equal(t, http.StatusOK, resp.Code)
|
||||||
|
|
||||||
|
// TODO(peering): replace with API types
|
||||||
|
var pbresp pbpeering.Peering
|
||||||
|
require.NoError(t, json.NewDecoder(resp.Body).Decode(&pbresp))
|
||||||
|
|
||||||
|
require.Equal(t, foo.Peering.Name, pbresp.Name)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("delete the existing token we just read", func(t *testing.T) {
|
||||||
|
req, err := http.NewRequest("DELETE", "/v1/peering/foo", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
a.srv.h.ServeHTTP(resp, req)
|
||||||
|
require.Equal(t, http.StatusOK, resp.Code)
|
||||||
|
require.Equal(t, "{}", resp.Body.String())
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("now the token is deleted, a read should 404", func(t *testing.T) {
|
||||||
|
req, err := http.NewRequest("GET", "/v1/peering/foo", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
a.srv.h.ServeHTTP(resp, req)
|
||||||
|
require.Equal(t, http.StatusNotFound, resp.Code)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("delete a token that does not exist", func(t *testing.T) {
|
||||||
|
req, err := http.NewRequest("DELETE", "/v1/peering/baz", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
a.srv.h.ServeHTTP(resp, req)
|
||||||
|
|
||||||
|
// TODO(peering): it may be a security concern, but do we want to say 404 here?
|
||||||
|
require.Equal(t, http.StatusOK, resp.Code)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestHTTP_Peering_List(t *testing.T) {
|
func TestHTTP_Peering_List(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("too slow for testing.Short")
|
t.Skip("too slow for testing.Short")
|
||||||
|
|
|
@ -51,6 +51,23 @@ func makeACLClient(t *testing.T) (*Client, *testutil.TestServer) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func makeClientWithCA(t *testing.T) (*Client, *testutil.TestServer) {
|
||||||
|
return makeClientWithConfig(t,
|
||||||
|
func(c *Config) {
|
||||||
|
c.TLSConfig = TLSConfig{
|
||||||
|
Address: "consul.test",
|
||||||
|
CAFile: "../test/client_certs/rootca.crt",
|
||||||
|
CertFile: "../test/client_certs/client.crt",
|
||||||
|
KeyFile: "../test/client_certs/client.key",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
func(c *testutil.TestServerConfig) {
|
||||||
|
c.CAFile = "../test/client_certs/rootca.crt"
|
||||||
|
c.CertFile = "../test/client_certs/server.crt"
|
||||||
|
c.KeyFile = "../test/client_certs/server.key"
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func makeClientWithConfig(
|
func makeClientWithConfig(
|
||||||
t *testing.T,
|
t *testing.T,
|
||||||
cb1 configCallback,
|
cb1 configCallback,
|
||||||
|
|
|
@ -0,0 +1,213 @@
|
||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
// PeeringState enumerates all the states a peering can be in
|
||||||
|
type PeeringState int32
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Undefined represents an unset value for PeeringState during
|
||||||
|
// writes.
|
||||||
|
UNDEFINED PeeringState = 0
|
||||||
|
// INITIAL Initial means a Peering has been initialized and is awaiting
|
||||||
|
// acknowledgement from a remote peer.
|
||||||
|
INITIAL PeeringState = 1
|
||||||
|
// Active means that the peering connection is active and healthy.
|
||||||
|
// ACTIVE PeeringState = 2
|
||||||
|
)
|
||||||
|
|
||||||
|
type Peering struct {
|
||||||
|
// ID is a datacenter-scoped UUID for the peering.
|
||||||
|
ID string
|
||||||
|
// Name is the local alias for the peering relationship.
|
||||||
|
Name string
|
||||||
|
// Partition is the local partition connecting to the peer.
|
||||||
|
Partition string `json:"Partition,omitempty"`
|
||||||
|
// State is one of the valid PeeringState values to represent the status of
|
||||||
|
// peering relationship.
|
||||||
|
State PeeringState
|
||||||
|
// PeerID is the ID that our peer assigned to this peering.
|
||||||
|
// This ID is to be used when dialing the peer, so that it can know who dialed it.
|
||||||
|
PeerID string
|
||||||
|
// PeerCAPems contains all the CA certificates for the remote peer.
|
||||||
|
PeerCAPems []string
|
||||||
|
// PeerServerName is the name of the remote server as it relates to TLS.
|
||||||
|
PeerServerName string
|
||||||
|
// PeerServerAddresses contains all the connection addresses for the remote peer.
|
||||||
|
PeerServerAddresses []string
|
||||||
|
// CreateIndex is the Raft index at which the Peering was created.
|
||||||
|
CreateIndex uint64
|
||||||
|
// ModifyIndex is the latest Raft index at which the Peering. was modified.
|
||||||
|
ModifyIndex uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
// PeeringRequest is used for Read and Delete HTTP calls.
|
||||||
|
// The PeeringReadRequest and PeeringDeleteRequest look the same, so we treat them the same for now
|
||||||
|
type PeeringRequest struct {
|
||||||
|
Name string
|
||||||
|
Partition string `json:"Partition,omitempty"`
|
||||||
|
Datacenter string
|
||||||
|
}
|
||||||
|
|
||||||
|
type PeeringReadResponse struct {
|
||||||
|
Peering *Peering
|
||||||
|
}
|
||||||
|
|
||||||
|
type PeeringDeleteResponse struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
type PeeringGenerateTokenRequest struct {
|
||||||
|
// PeerName is the name of the remote peer.
|
||||||
|
PeerName string
|
||||||
|
// Partition to be peered.
|
||||||
|
Partition string `json:"Partition,omitempty"`
|
||||||
|
Datacenter string
|
||||||
|
Token string
|
||||||
|
}
|
||||||
|
|
||||||
|
type PeeringGenerateTokenResponse struct {
|
||||||
|
// PeeringToken is an opaque string provided to the remote peer for it to complete
|
||||||
|
// the peering initialization handshake.
|
||||||
|
PeeringToken string
|
||||||
|
}
|
||||||
|
|
||||||
|
type PeeringInitiateRequest struct {
|
||||||
|
// Name of the remote peer.
|
||||||
|
PeerName string
|
||||||
|
// The peering token returned from the peer's GenerateToken endpoint.
|
||||||
|
PeeringToken string
|
||||||
|
Datacenter string
|
||||||
|
Token string
|
||||||
|
}
|
||||||
|
|
||||||
|
type PeeringInitiateResponse struct {
|
||||||
|
Status uint32
|
||||||
|
}
|
||||||
|
|
||||||
|
type Peerings struct {
|
||||||
|
c *Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// Peerings returns a handle to the operator endpoints.
|
||||||
|
func (c *Client) Peerings() *Peerings {
|
||||||
|
return &Peerings{c: c}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Peerings) Read(ctx context.Context, name string, q *QueryOptions) (*Peering, *QueryMeta, error) {
|
||||||
|
if name == "" {
|
||||||
|
return nil, nil, fmt.Errorf("peering name cannot be empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
req := p.c.newRequest("GET", fmt.Sprintf("/v1/peering/%s", name))
|
||||||
|
req.setQueryOptions(q)
|
||||||
|
req.ctx = ctx
|
||||||
|
|
||||||
|
rtt, resp, err := p.c.doRequest(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
defer closeResponseBody(resp)
|
||||||
|
if err := requireOK(resp); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
qm := &QueryMeta{}
|
||||||
|
parseQueryMeta(resp, qm)
|
||||||
|
qm.RequestTime = rtt
|
||||||
|
|
||||||
|
var out Peering
|
||||||
|
if err := decodeBody(resp, &out); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &out, qm, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Peerings) Delete(ctx context.Context, peeringReq PeeringRequest, q *QueryOptions) (*PeeringDeleteResponse, *QueryMeta, error) {
|
||||||
|
if peeringReq.Name == "" {
|
||||||
|
return nil, nil, fmt.Errorf("peering name cannot be empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
req := p.c.newRequest("DELETE", fmt.Sprintf("/v1/peering/%s", peeringReq.Name))
|
||||||
|
req.setQueryOptions(q)
|
||||||
|
req.obj = peeringReq
|
||||||
|
req.ctx = ctx
|
||||||
|
|
||||||
|
rtt, resp, err := p.c.doRequest(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
defer closeResponseBody(resp)
|
||||||
|
if err := requireOK(resp); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
qm := &QueryMeta{}
|
||||||
|
parseQueryMeta(resp, qm)
|
||||||
|
qm.RequestTime = rtt
|
||||||
|
|
||||||
|
var out PeeringDeleteResponse
|
||||||
|
if err := decodeBody(resp, &out); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &out, qm, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Peerings) GenerateToken(ctx context.Context, g PeeringGenerateTokenRequest, wq *WriteOptions) (*PeeringGenerateTokenResponse, *WriteMeta, error) {
|
||||||
|
if g.PeerName == "" {
|
||||||
|
return nil, nil, fmt.Errorf("peer name cannot be empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
req := p.c.newRequest("POST", fmt.Sprint("/v1/peering/token"))
|
||||||
|
req.setWriteOptions(wq)
|
||||||
|
req.ctx = ctx
|
||||||
|
req.obj = g
|
||||||
|
|
||||||
|
rtt, resp, err := p.c.doRequest(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
defer closeResponseBody(resp)
|
||||||
|
if err := requireOK(resp); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
wm := &WriteMeta{RequestTime: rtt}
|
||||||
|
|
||||||
|
var out PeeringGenerateTokenResponse
|
||||||
|
if err := decodeBody(resp, &out); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &out, wm, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Peerings) Initiate(ctx context.Context, i PeeringInitiateRequest, wq *WriteOptions) (*PeeringInitiateResponse, *WriteMeta, error) {
|
||||||
|
|
||||||
|
req := p.c.newRequest("POST", fmt.Sprint("/v1/peering/initiate"))
|
||||||
|
req.setWriteOptions(wq)
|
||||||
|
req.ctx = ctx
|
||||||
|
req.obj = i
|
||||||
|
|
||||||
|
rtt, resp, err := p.c.doRequest(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
defer closeResponseBody(resp)
|
||||||
|
if err := requireOK(resp); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
wm := &WriteMeta{RequestTime: rtt}
|
||||||
|
|
||||||
|
var out PeeringInitiateResponse
|
||||||
|
if err := decodeBody(resp, &out); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &out, wm, nil
|
||||||
|
}
|
|
@ -0,0 +1,112 @@
|
||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/sdk/testutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO(peering): cover the following test cases: bad/ malformed input, peering with wrong token,
|
||||||
|
// peering with the wrong PeerName
|
||||||
|
|
||||||
|
// TestAPI_Peering_GenerateToken_Read_Initiate_Delete tests the following use case:
|
||||||
|
// a server creates a peering token, reads the token, then another server calls initiate peering
|
||||||
|
// finally, we delete the token on the first server
|
||||||
|
func TestAPI_Peering_GenerateToken_Read_Initiate_Delete(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
c, s := makeClientWithCA(t)
|
||||||
|
defer s.Stop()
|
||||||
|
s.WaitForSerfCheck(t)
|
||||||
|
options := &WriteOptions{Datacenter: "dc1"}
|
||||||
|
ctx := context.Background()
|
||||||
|
peerings := c.Peerings()
|
||||||
|
|
||||||
|
p1 := PeeringGenerateTokenRequest{
|
||||||
|
PeerName: "peer1",
|
||||||
|
}
|
||||||
|
var token1 string
|
||||||
|
// Generate a token happy path
|
||||||
|
resp, wm, err := peerings.GenerateToken(ctx, p1, options)
|
||||||
|
token1 = resp.PeeringToken
|
||||||
|
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotEmpty(t, wm)
|
||||||
|
require.NotEmpty(t, resp)
|
||||||
|
|
||||||
|
// Read token generated on server
|
||||||
|
resp2, qm, err2 := peerings.Read(ctx, "peer1", nil)
|
||||||
|
|
||||||
|
// basic ok checking
|
||||||
|
require.NoError(t, err2)
|
||||||
|
require.NotEmpty(t, qm)
|
||||||
|
require.NotEmpty(t, resp2)
|
||||||
|
|
||||||
|
// token specific assertions on the "server"
|
||||||
|
require.Equal(t, "peer1", resp2.Name)
|
||||||
|
|
||||||
|
// TODO(peering) -- split in OSS/ ENT test for "default" vs ""; or revisit PartitionOrEmpty vs PartitionOrDefault
|
||||||
|
// require.Equal(t, "default", resp2.Partition)
|
||||||
|
require.Equal(t, INITIAL, resp2.State)
|
||||||
|
|
||||||
|
// Initiate peering
|
||||||
|
|
||||||
|
// make a "client" server in second DC for peering
|
||||||
|
c2, s2 := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) {
|
||||||
|
conf.Datacenter = "dc2"
|
||||||
|
})
|
||||||
|
defer s2.Stop()
|
||||||
|
|
||||||
|
i := PeeringInitiateRequest{
|
||||||
|
Datacenter: c2.config.Datacenter,
|
||||||
|
PeerName: "peer1",
|
||||||
|
PeeringToken: token1,
|
||||||
|
}
|
||||||
|
|
||||||
|
respi, wm3, err3 := c2.Peerings().Initiate(ctx, i, options)
|
||||||
|
|
||||||
|
// basic checks
|
||||||
|
require.NoError(t, err3)
|
||||||
|
require.NotEmpty(t, wm3)
|
||||||
|
|
||||||
|
// at first the token will be undefined
|
||||||
|
require.Equal(t, UNDEFINED, PeeringState(respi.Status))
|
||||||
|
|
||||||
|
// wait for the peering backend to finish the peering connection
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
|
respr, qm2, err4 := c2.Peerings().Read(ctx, "peer1", nil)
|
||||||
|
|
||||||
|
// basic ok checking
|
||||||
|
require.NoError(t, err4)
|
||||||
|
require.NotEmpty(t, qm2)
|
||||||
|
|
||||||
|
// require that the peering state is not undefined
|
||||||
|
require.Equal(t, INITIAL, respr.State)
|
||||||
|
|
||||||
|
// TODO(peering) -- let's go all the way and test in code either here or somewhere else that PeeringState does move to Active
|
||||||
|
// require.Equal(t, PeeringState_ACTIVE, respr.State)
|
||||||
|
|
||||||
|
// Delete the token on server 1
|
||||||
|
p := PeeringRequest{
|
||||||
|
Name: "peer1",
|
||||||
|
}
|
||||||
|
resp4, qm3, err5 := peerings.Delete(ctx, p, nil)
|
||||||
|
|
||||||
|
require.NoError(t, err5)
|
||||||
|
require.NotEmpty(t, qm3)
|
||||||
|
|
||||||
|
// {} is returned on success for now
|
||||||
|
require.Empty(t, resp4)
|
||||||
|
|
||||||
|
// Read to see if the token is "gone"
|
||||||
|
resp5, qm4, err6 := peerings.Read(ctx, "peer1", nil)
|
||||||
|
|
||||||
|
// basic checks
|
||||||
|
require.NotNil(t, err6)
|
||||||
|
require.Empty(t, qm4)
|
||||||
|
require.Empty(t, resp5)
|
||||||
|
}
|
Loading…
Reference in New Issue