Leadership transfer cmd (#14132)

* add leadership transfer command

* add RPC call test (flaky)

* add missing import

* add changelog

* add command registration

* Apply suggestions from code review

Co-authored-by: Matt Keeler <mkeeler@users.noreply.github.com>

* add the possibility of providing an id to raft leadership transfer. Add few tests.

* delete old file from cherry pick

* rename changelog filename to PR #

* rename changelog and fix import

* fix failing test

* check for OperatorWrite

Co-authored-by: Matt Keeler <mkeeler@users.noreply.github.com>

* rename from leader-transfer to transfer-leader

* remove version check and add test for operator read

* move struct to operator.go

* first pass

* add code for leader transfer in the grpc backend and tests

* wire the http endpoint to the new grpc endpoint

* remove the RPC endpoint

* remove non needed struct

* fix naming

* add mog glue to API

* fix comment

* remove dead code

* fix linter error

* change package name for proto file

* remove error wrapping

* fix failing test

* add command registration

* add grpc service mock tests

* fix receiver to be pointer

* use defined values

Co-authored-by: Matt Keeler <mkeeler@users.noreply.github.com>

* reuse MockAclAuthorizer

* add documentation

* remove usage of external.TokenFromContext

* fix failing tests

* fix proto generation

* Apply suggestions from code review

Co-authored-by: Jared Kirschner <85913323+jkirschner-hashicorp@users.noreply.github.com>

* Apply suggestions from code review

Co-authored-by: Jared Kirschner <85913323+jkirschner-hashicorp@users.noreply.github.com>

* Apply suggestions from code review

Co-authored-by: Jared Kirschner <85913323+jkirschner-hashicorp@users.noreply.github.com>

* Apply suggestions from code review

* add more context in doc for the reason

* Apply suggestions from docs code review

Co-authored-by: Jeff Boruszak <104028618+boruszak@users.noreply.github.com>

* regenerate proto

* fix linter errors

Co-authored-by: github-team-consul-core <github-team-consul-core@hashicorp.com>
Co-authored-by: Matt Keeler <mkeeler@users.noreply.github.com>
Co-authored-by: Jared Kirschner <85913323+jkirschner-hashicorp@users.noreply.github.com>
Co-authored-by: Jeff Boruszak <104028618+boruszak@users.noreply.github.com>
This commit is contained in:
Dhia Ayachi 2022-11-14 15:35:12 -05:00 committed by GitHub
parent b3eaab3989
commit 219a3c5bd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 1397 additions and 237 deletions

3
.changelog/14132.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:enhancement
raft: add an operator api endpoint and a command to initiate raft leadership transfer.
```

223
acl/MockAuthorizer.go Normal file
View File

@ -0,0 +1,223 @@
package acl
import "github.com/stretchr/testify/mock"
type MockAuthorizer struct {
mock.Mock
}
var _ Authorizer = (*MockAuthorizer)(nil)
// ACLRead checks for permission to list all the ACLs
func (m *MockAuthorizer) ACLRead(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// ACLWrite checks for permission to manipulate ACLs
func (m *MockAuthorizer) ACLWrite(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// AgentRead checks for permission to read from agent endpoints for a
// given node.
func (m *MockAuthorizer) AgentRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// AgentWrite checks for permission to make changes via agent endpoints
// for a given node.
func (m *MockAuthorizer) AgentWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// EventRead determines if a specific event can be queried.
func (m *MockAuthorizer) EventRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// EventWrite determines if a specific event may be fired.
func (m *MockAuthorizer) EventWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// IntentionDefaultAllow determines the default authorized behavior
// when no intentions match a Connect request.
func (m *MockAuthorizer) IntentionDefaultAllow(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// IntentionRead determines if a specific intention can be read.
func (m *MockAuthorizer) IntentionRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// IntentionWrite determines if a specific intention can be
// created, modified, or deleted.
func (m *MockAuthorizer) IntentionWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// KeyList checks for permission to list keys under a prefix
func (m *MockAuthorizer) KeyList(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// KeyRead checks for permission to read a given key
func (m *MockAuthorizer) KeyRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// KeyWrite checks for permission to write a given key
func (m *MockAuthorizer) KeyWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// KeyWritePrefix checks for permission to write to an
// entire key prefix. This means there must be no sub-policies
// that deny a write.
func (m *MockAuthorizer) KeyWritePrefix(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// KeyringRead determines if the encryption keyring used in
// the gossip layer can be read.
func (m *MockAuthorizer) KeyringRead(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// KeyringWrite determines if the keyring can be manipulated
func (m *MockAuthorizer) KeyringWrite(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// NodeRead checks for permission to read (discover) a given node.
func (m *MockAuthorizer) NodeRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
func (m *MockAuthorizer) NodeReadAll(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// NodeWrite checks for permission to create or update (register) a
// given node.
func (m *MockAuthorizer) NodeWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
func (m *MockAuthorizer) MeshRead(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
func (m *MockAuthorizer) MeshWrite(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// PeeringRead determines if the read-only Consul peering functions
// can be used.
func (m *MockAuthorizer) PeeringRead(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// PeeringWrite determines if the state-changing Consul peering
// functions can be used.
func (m *MockAuthorizer) PeeringWrite(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// OperatorRead determines if the read-only Consul operator functions
// can be used. ret := m.Called(segment, ctx)
func (m *MockAuthorizer) OperatorRead(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// OperatorWrite determines if the state-changing Consul operator
// functions can be used.
func (m *MockAuthorizer) OperatorWrite(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// PreparedQueryRead determines if a specific prepared query can be read
// to show its contents (this is not used for execution).
func (m *MockAuthorizer) PreparedQueryRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// PreparedQueryWrite determines if a specific prepared query can be
// created, modified, or deleted.
func (m *MockAuthorizer) PreparedQueryWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// ServiceRead checks for permission to read a given service
func (m *MockAuthorizer) ServiceRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
func (m *MockAuthorizer) ServiceReadAll(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// ServiceWrite checks for permission to create or update a given
// service
func (m *MockAuthorizer) ServiceWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// ServiceWriteAny checks for service:write on any service
func (m *MockAuthorizer) ServiceWriteAny(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// SessionRead checks for permission to read sessions for a given node.
func (m *MockAuthorizer) SessionRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// SessionWrite checks for permission to create sessions for a given
// node.
func (m *MockAuthorizer) SessionWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// Snapshot checks for permission to take and restore snapshots.
func (m *MockAuthorizer) Snapshot(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
func (p *MockAuthorizer) ToAllowAuthorizer() AllowAuthorizer {
return AllowAuthorizer{Authorizer: p}
}

View File

@ -4,230 +4,9 @@ import (
"fmt" "fmt"
"testing" "testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
type mockAuthorizer struct {
mock.Mock
}
var _ Authorizer = (*mockAuthorizer)(nil)
// ACLRead checks for permission to list all the ACLs
func (m *mockAuthorizer) ACLRead(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// ACLWrite checks for permission to manipulate ACLs
func (m *mockAuthorizer) ACLWrite(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// AgentRead checks for permission to read from agent endpoints for a
// given node.
func (m *mockAuthorizer) AgentRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// AgentWrite checks for permission to make changes via agent endpoints
// for a given node.
func (m *mockAuthorizer) AgentWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// EventRead determines if a specific event can be queried.
func (m *mockAuthorizer) EventRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// EventWrite determines if a specific event may be fired.
func (m *mockAuthorizer) EventWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// IntentionDefaultAllow determines the default authorized behavior
// when no intentions match a Connect request.
func (m *mockAuthorizer) IntentionDefaultAllow(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// IntentionRead determines if a specific intention can be read.
func (m *mockAuthorizer) IntentionRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// IntentionWrite determines if a specific intention can be
// created, modified, or deleted.
func (m *mockAuthorizer) IntentionWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// KeyList checks for permission to list keys under a prefix
func (m *mockAuthorizer) KeyList(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// KeyRead checks for permission to read a given key
func (m *mockAuthorizer) KeyRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// KeyWrite checks for permission to write a given key
func (m *mockAuthorizer) KeyWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// KeyWritePrefix checks for permission to write to an
// entire key prefix. This means there must be no sub-policies
// that deny a write.
func (m *mockAuthorizer) KeyWritePrefix(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// KeyringRead determines if the encryption keyring used in
// the gossip layer can be read.
func (m *mockAuthorizer) KeyringRead(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// KeyringWrite determines if the keyring can be manipulated
func (m *mockAuthorizer) KeyringWrite(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// NodeRead checks for permission to read (discover) a given node.
func (m *mockAuthorizer) NodeRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
func (m *mockAuthorizer) NodeReadAll(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// NodeWrite checks for permission to create or update (register) a
// given node.
func (m *mockAuthorizer) NodeWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
func (m *mockAuthorizer) MeshRead(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
func (m *mockAuthorizer) MeshWrite(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// PeeringRead determines if the read-only Consul peering functions
// can be used.
func (m *mockAuthorizer) PeeringRead(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// PeeringWrite determines if the state-changing Consul peering
// functions can be used.
func (m *mockAuthorizer) PeeringWrite(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// OperatorRead determines if the read-only Consul operator functions
// can be used. ret := m.Called(segment, ctx)
func (m *mockAuthorizer) OperatorRead(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// OperatorWrite determines if the state-changing Consul operator
// functions can be used.
func (m *mockAuthorizer) OperatorWrite(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// PreparedQueryRead determines if a specific prepared query can be read
// to show its contents (this is not used for execution).
func (m *mockAuthorizer) PreparedQueryRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// PreparedQueryWrite determines if a specific prepared query can be
// created, modified, or deleted.
func (m *mockAuthorizer) PreparedQueryWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// ServiceRead checks for permission to read a given service
func (m *mockAuthorizer) ServiceRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
func (m *mockAuthorizer) ServiceReadAll(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// ServiceWrite checks for permission to create or update a given
// service
func (m *mockAuthorizer) ServiceWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// ServiceWriteAny checks for service:write on any service
func (m *mockAuthorizer) ServiceWriteAny(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
// SessionRead checks for permission to read sessions for a given node.
func (m *mockAuthorizer) SessionRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// SessionWrite checks for permission to create sessions for a given
// node.
func (m *mockAuthorizer) SessionWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(segment, ctx)
return ret.Get(0).(EnforcementDecision)
}
// Snapshot checks for permission to take and restore snapshots.
func (m *mockAuthorizer) Snapshot(ctx *AuthorizerContext) EnforcementDecision {
ret := m.Called(ctx)
return ret.Get(0).(EnforcementDecision)
}
func (p *mockAuthorizer) ToAllowAuthorizer() AllowAuthorizer {
return AllowAuthorizer{Authorizer: p}
}
func TestACL_Enforce(t *testing.T) { func TestACL_Enforce(t *testing.T) {
type testCase struct { type testCase struct {
method string method string
@ -664,7 +443,7 @@ func TestACL_Enforce(t *testing.T) {
for _, tcase := range cases { for _, tcase := range cases {
t.Run(testName(tcase), func(t *testing.T) { t.Run(testName(tcase), func(t *testing.T) {
m := &mockAuthorizer{} m := &MockAuthorizer{}
if tcase.err == "" { if tcase.err == "" {
var nilCtx *AuthorizerContext var nilCtx *AuthorizerContext

View File

@ -16,7 +16,7 @@ func TestPermissionDeniedError(t *testing.T) {
return t.expected return t.expected
} }
auth1 := mockAuthorizer{} auth1 := MockAuthorizer{}
cases := []testCase{ cases := []testCase{
{ {

View File

@ -5,6 +5,7 @@ import (
"crypto/tls" "crypto/tls"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/hashicorp/consul/proto/pboperator"
"io" "io"
"net" "net"
"net/http" "net/http"
@ -382,6 +383,8 @@ type Agent struct {
rpcClientPeering pbpeering.PeeringServiceClient rpcClientPeering pbpeering.PeeringServiceClient
rpcClientOperator pboperator.OperatorServiceClient
// routineManager is responsible for managing longer running go routines // routineManager is responsible for managing longer running go routines
// run by the Agent // run by the Agent
routineManager *routine.Manager routineManager *routine.Manager
@ -467,6 +470,7 @@ func New(bd BaseDeps) (*Agent, error) {
} }
a.rpcClientPeering = pbpeering.NewPeeringServiceClient(conn) a.rpcClientPeering = pbpeering.NewPeeringServiceClient(conn)
a.rpcClientOperator = pboperator.NewOperatorServiceClient(conn)
a.serviceManager = NewServiceManager(&a) a.serviceManager = NewServiceManager(&a)

View File

@ -0,0 +1,34 @@
package consul
import (
"context"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/acl/resolver"
"github.com/hashicorp/consul/agent/rpc/operator"
"github.com/hashicorp/consul/proto/pboperator"
"github.com/hashicorp/raft"
)
type OperatorBackend struct {
srv *Server
}
// NewOperatorBackend returns a operator.Backend implementation that is bound to the given server.
func NewOperatorBackend(srv *Server) *OperatorBackend {
return &OperatorBackend{
srv: srv,
}
}
func (op *OperatorBackend) ResolveTokenAndDefaultMeta(token string, entMeta *acl.EnterpriseMeta, authzCtx *acl.AuthorizerContext) (resolver.Result, error) {
return op.srv.ResolveTokenAndDefaultMeta(token, entMeta, authzCtx)
}
func (op *OperatorBackend) TransferLeader(_ context.Context, request *pboperator.TransferLeaderRequest) (*pboperator.TransferLeaderResponse, error) {
reply := new(pboperator.TransferLeaderResponse)
err := op.srv.attemptLeadershipTransfer(raft.ServerID(request.ID))
reply.Success = err == nil
return reply, err
}
var _ operator.Backend = (*OperatorBackend)(nil)

View File

@ -0,0 +1,193 @@
package consul
import (
"context"
"github.com/hashicorp/consul/acl"
external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pboperator"
"github.com/hashicorp/consul/sdk/testutil/retry"
"google.golang.org/grpc/credentials/insecure"
"testing"
"time"
"github.com/stretchr/testify/require"
gogrpc "google.golang.org/grpc"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/testrpc"
)
func TestOperatorBackend_TransferLeader(t *testing.T) {
t.Parallel()
conf := testClusterConfig{
Datacenter: "dc1",
Servers: 3,
ServerConf: func(config *Config) {
config.RaftConfig.HeartbeatTimeout = 2 * time.Second
config.RaftConfig.ElectionTimeout = 2 * time.Second
config.RaftConfig.LeaderLeaseTimeout = 1 * time.Second
},
}
nodes := newTestCluster(t, &conf)
s1 := nodes.Servers[0]
// Make sure a leader is elected
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Make a write call to server2 and make sure it gets forwarded to server1
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)
// Dial server2 directly
conn, err := gogrpc.DialContext(ctx, s1.config.RPCAddr.String(),
gogrpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())),
gogrpc.WithTransportCredentials(insecure.NewCredentials()),
gogrpc.WithBlock())
require.NoError(t, err)
t.Cleanup(func() { conn.Close() })
operatorClient := pboperator.NewOperatorServiceClient(conn)
testutil.RunStep(t, "transfer leader", func(t *testing.T) {
beforeLeader, _ := s1.raft.LeaderWithID()
require.NotEmpty(t, beforeLeader)
// Do the grpc Write call to server2
req := pboperator.TransferLeaderRequest{
ID: "",
}
reply, err := operatorClient.TransferLeader(ctx, &req)
require.NoError(t, err)
require.True(t, reply.Success)
time.Sleep(1 * time.Second)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
retry.Run(t, func(r *retry.R) {
afterLeader, _ := s1.raft.LeaderWithID()
require.NotEmpty(r, afterLeader)
})
afterLeader, _ := s1.raft.LeaderWithID()
require.NotEmpty(t, afterLeader)
if afterLeader == beforeLeader {
t.Fatalf("leader should have changed %s == %s", afterLeader, beforeLeader)
}
})
}
func TestOperatorBackend_TransferLeaderWithACL(t *testing.T) {
t.Parallel()
conf := testClusterConfig{
Datacenter: "dc1",
Servers: 3,
ServerConf: func(config *Config) {
config.RaftConfig.HeartbeatTimeout = 2 * time.Second
config.RaftConfig.ElectionTimeout = 2 * time.Second
config.RaftConfig.LeaderLeaseTimeout = 1 * time.Second
config.ACLsEnabled = true
config.ACLInitialManagementToken = "root"
config.ACLResolverSettings.ACLDefaultPolicy = "deny"
},
}
nodes := newTestCluster(t, &conf)
s1 := nodes.Servers[0]
// Make sure a leader is elected
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Make a write call to server2 and make sure it gets forwarded to server1
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)
// Dial server2 directly
conn, err := gogrpc.DialContext(ctx, s1.config.RPCAddr.String(),
gogrpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())),
gogrpc.WithTransportCredentials(insecure.NewCredentials()),
gogrpc.WithBlock())
require.NoError(t, err)
t.Cleanup(func() { conn.Close() })
operatorClient := pboperator.NewOperatorServiceClient(conn)
testutil.RunStep(t, "transfer leader no token", func(t *testing.T) {
beforeLeader, _ := s1.raft.LeaderWithID()
require.NotEmpty(t, beforeLeader)
// Do the grpc Write call to server2
req := pboperator.TransferLeaderRequest{
ID: "",
}
reply, err := operatorClient.TransferLeader(ctx, &req)
require.True(t, acl.IsErrPermissionDenied(err))
require.Nil(t, reply)
time.Sleep(1 * time.Second)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
retry.Run(t, func(r *retry.R) {
afterLeader, _ := s1.raft.LeaderWithID()
require.NotEmpty(r, afterLeader)
})
afterLeader, _ := s1.raft.LeaderWithID()
require.NotEmpty(t, afterLeader)
if afterLeader != beforeLeader {
t.Fatalf("leader should have changed %s == %s", afterLeader, beforeLeader)
}
})
testutil.RunStep(t, "transfer leader operator read token", func(t *testing.T) {
beforeLeader, _ := s1.raft.LeaderWithID()
require.NotEmpty(t, beforeLeader)
// Do the grpc Write call to server2
req := pboperator.TransferLeaderRequest{
ID: "",
}
codec := rpcClient(t, s1)
rules := `operator = "read"`
tokenRead := createToken(t, codec, rules)
ctxToken, err := external.ContextWithQueryOptions(ctx, structs.QueryOptions{Token: tokenRead})
require.NoError(t, err)
reply, err := operatorClient.TransferLeader(ctxToken, &req)
require.True(t, acl.IsErrPermissionDenied(err))
require.Nil(t, reply)
time.Sleep(1 * time.Second)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
retry.Run(t, func(r *retry.R) {
afterLeader, _ := s1.raft.LeaderWithID()
require.NotEmpty(r, afterLeader)
})
afterLeader, _ := s1.raft.LeaderWithID()
require.NotEmpty(t, afterLeader)
if afterLeader != beforeLeader {
t.Fatalf("leader should have changed %s == %s", afterLeader, beforeLeader)
}
})
testutil.RunStep(t, "transfer leader operator write token", func(t *testing.T) {
beforeLeader, _ := s1.raft.LeaderWithID()
require.NotEmpty(t, beforeLeader)
// Do the grpc Write call to server2
req := pboperator.TransferLeaderRequest{
ID: "",
}
codec := rpcClient(t, s1)
rules := `operator = "write"`
tokenWrite := createTokenWithPolicyNameFull(t, codec, "the-policy-write", rules, "root")
ctxToken, err := external.ContextWithQueryOptions(ctx, structs.QueryOptions{Token: tokenWrite.SecretID})
require.NoError(t, err)
reply, err := operatorClient.TransferLeader(ctxToken, &req)
require.NoError(t, err)
require.True(t, reply.Success)
time.Sleep(1 * time.Second)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
retry.Run(t, func(r *retry.R) {
afterLeader, _ := s1.raft.LeaderWithID()
require.NotEmpty(r, afterLeader)
})
afterLeader, _ := s1.raft.LeaderWithID()
require.NotEmpty(t, afterLeader)
if afterLeader == beforeLeader {
t.Fatalf("leader should have changed %s == %s", afterLeader, beforeLeader)
}
})
}

View File

@ -2,6 +2,7 @@ package consul
import ( import (
"fmt" "fmt"
"net" "net"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"

View File

@ -21,7 +21,6 @@ import (
"github.com/hashicorp/go-connlimit" "github.com/hashicorp/go-connlimit"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-version"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
autopilot "github.com/hashicorp/raft-autopilot" autopilot "github.com/hashicorp/raft-autopilot"
raftboltdb "github.com/hashicorp/raft-boltdb/v2" raftboltdb "github.com/hashicorp/raft-boltdb/v2"
@ -51,6 +50,7 @@ import (
"github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/rpc/middleware" "github.com/hashicorp/consul/agent/rpc/middleware"
"github.com/hashicorp/consul/agent/rpc/operator"
"github.com/hashicorp/consul/agent/rpc/peering" "github.com/hashicorp/consul/agent/rpc/peering"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/agent/token"
@ -370,6 +370,9 @@ type Server struct {
// peeringBackend is shared between the external and internal gRPC services for peering // peeringBackend is shared between the external and internal gRPC services for peering
peeringBackend *PeeringBackend peeringBackend *PeeringBackend
// operatorBackend is shared between the external and internal gRPC services for peering
operatorBackend *OperatorBackend
// peerStreamServer is a server used to handle peering streams from external clusters. // peerStreamServer is a server used to handle peering streams from external clusters.
peerStreamServer *peerstream.Server peerStreamServer *peerstream.Server
@ -385,6 +388,7 @@ type Server struct {
// embedded struct to hold all the enterprise specific data // embedded struct to hold all the enterprise specific data
EnterpriseServer EnterpriseServer
operatorServer *operator.Server
} }
type connHandler interface { type connHandler interface {
Run() error Run() error
@ -739,6 +743,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Ser
}).Register(s.externalGRPCServer) }).Register(s.externalGRPCServer)
s.peeringBackend = NewPeeringBackend(s) s.peeringBackend = NewPeeringBackend(s)
s.operatorBackend = NewOperatorBackend(s)
s.peerStreamServer = peerstream.NewServer(peerstream.Config{ s.peerStreamServer = peerstream.NewServer(peerstream.Config{
Backend: s.peeringBackend, Backend: s.peeringBackend,
GetStore: func() peerstream.StateStore { return s.FSM().State() }, GetStore: func() peerstream.StateStore { return s.FSM().State() },
@ -826,6 +831,19 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler
PeeringEnabled: config.PeeringEnabled, PeeringEnabled: config.PeeringEnabled,
}) })
s.peeringServer = p s.peeringServer = p
o := operator.NewServer(operator.Config{
Backend: s.operatorBackend,
Logger: deps.Logger.Named("grpc-api.operator"),
ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) {
// Only forward the request if the dc in the request matches the server's datacenter.
if info.RequestDatacenter() != "" && info.RequestDatacenter() != config.Datacenter {
return false, fmt.Errorf("requests to transfer leader cannot be forwarded to remote datacenters")
}
return s.ForwardGRPC(s.grpcConnPool, info, fn)
},
Datacenter: config.Datacenter,
})
s.operatorServer = o
register := func(srv *grpc.Server) { register := func(srv *grpc.Server) {
if config.RPCConfig.EnableStreaming { if config.RPCConfig.EnableStreaming {
@ -834,6 +852,7 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler
deps.Logger.Named("grpc-api.subscription"))) deps.Logger.Named("grpc-api.subscription")))
} }
s.peeringServer.Register(srv) s.peeringServer.Register(srv)
s.operatorServer.Register(srv)
s.registerEnterpriseGRPCServices(deps, srv) s.registerEnterpriseGRPCServices(deps, srv)
// Note: these external gRPC services are also exposed on the internal server to // Note: these external gRPC services are also exposed on the internal server to
@ -1193,20 +1212,25 @@ func (s *Server) Shutdown() error {
return nil return nil
} }
func (s *Server) attemptLeadershipTransfer() (success bool) { func (s *Server) attemptLeadershipTransfer(id raft.ServerID) (err error) {
leadershipTransferVersion := version.Must(version.NewVersion(LeaderTransferMinVersion)) var addr raft.ServerAddress
if id != "" {
ok, _ := ServersInDCMeetMinimumVersion(s, s.config.Datacenter, leadershipTransferVersion) addr, err = s.serverLookup.ServerAddr(id)
if !ok { if err != nil {
return false return err
} }
future := s.raft.LeadershipTransferToServer(id, addr)
if err := future.Error(); err != nil {
return err
}
} else {
future := s.raft.LeadershipTransfer() future := s.raft.LeadershipTransfer()
if err := future.Error(); err != nil { if err := future.Error(); err != nil {
s.logger.Error("failed to transfer leadership, removing the server", "error", err) return err
return false
} }
return true }
return nil
} }
// Leave is used to prepare for a graceful shutdown. // Leave is used to prepare for a graceful shutdown.
@ -1228,7 +1252,7 @@ func (s *Server) Leave() error {
// removed for some reasonable period of time. // removed for some reasonable period of time.
isLeader := s.IsLeader() isLeader := s.IsLeader()
if isLeader && numPeers > 1 { if isLeader && numPeers > 1 {
if s.attemptLeadershipTransfer() { if err := s.attemptLeadershipTransfer(""); err == nil {
isLeader = false isLeader = false
} else { } else {
future := s.raft.RemoveServer(raft.ServerID(s.config.NodeID), 0, 0) future := s.raft.RemoveServer(raft.ServerID(s.config.NodeID), 0, 0)

View File

@ -99,6 +99,7 @@ func init() {
registerEndpoint("/v1/internal/acl/authorize", []string{"POST"}, (*HTTPHandlers).ACLAuthorize) registerEndpoint("/v1/internal/acl/authorize", []string{"POST"}, (*HTTPHandlers).ACLAuthorize)
registerEndpoint("/v1/kv/", []string{"GET", "PUT", "DELETE"}, (*HTTPHandlers).KVSEndpoint) registerEndpoint("/v1/kv/", []string{"GET", "PUT", "DELETE"}, (*HTTPHandlers).KVSEndpoint)
registerEndpoint("/v1/operator/raft/configuration", []string{"GET"}, (*HTTPHandlers).OperatorRaftConfiguration) registerEndpoint("/v1/operator/raft/configuration", []string{"GET"}, (*HTTPHandlers).OperatorRaftConfiguration)
registerEndpoint("/v1/operator/raft/transfer-leader", []string{"POST"}, (*HTTPHandlers).OperatorRaftTransferLeader)
registerEndpoint("/v1/operator/raft/peer", []string{"DELETE"}, (*HTTPHandlers).OperatorRaftPeer) registerEndpoint("/v1/operator/raft/peer", []string{"DELETE"}, (*HTTPHandlers).OperatorRaftPeer)
registerEndpoint("/v1/operator/keyring", []string{"GET", "POST", "PUT", "DELETE"}, (*HTTPHandlers).OperatorKeyringEndpoint) registerEndpoint("/v1/operator/keyring", []string{"GET", "POST", "PUT", "DELETE"}, (*HTTPHandlers).OperatorKeyringEndpoint)
registerEndpoint("/v1/operator/autopilot/configuration", []string{"GET", "PUT"}, (*HTTPHandlers).OperatorAutopilotConfiguration) registerEndpoint("/v1/operator/autopilot/configuration", []string{"GET", "PUT"}, (*HTTPHandlers).OperatorAutopilotConfiguration)

View File

@ -2,6 +2,8 @@ package agent
import ( import (
"fmt" "fmt"
external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/proto/pboperator"
"net/http" "net/http"
"strconv" "strconv"
"time" "time"
@ -31,6 +33,43 @@ func (s *HTTPHandlers) OperatorRaftConfiguration(resp http.ResponseWriter, req *
return reply, nil return reply, nil
} }
// OperatorRaftTransferLeader is used to transfer raft cluster leadership to another node
func (s *HTTPHandlers) OperatorRaftTransferLeader(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var entMeta acl.EnterpriseMeta
if err := s.parseEntMetaPartition(req, &entMeta); err != nil {
return nil, err
}
params := req.URL.Query()
_, hasID := params["id"]
ID := ""
if hasID {
ID = params.Get("id")
}
args := pboperator.TransferLeaderRequest{
ID: ID,
}
var token string
s.parseToken(req, &token)
ctx, err := external.ContextWithQueryOptions(req.Context(), structs.QueryOptions{Token: token})
if err != nil {
return nil, err
}
result, err := s.agent.rpcClientOperator.TransferLeader(ctx, &args)
if err != nil {
return nil, err
}
if result.Success != true {
return nil, HTTPError{StatusCode: http.StatusNotFound, Reason: fmt.Sprintf("Failed to transfer Leader: %s", err.Error())}
}
reply := new(api.TransferLeaderResponse)
pboperator.TransferLeaderResponseToAPI(result, reply)
return reply, nil
}
// OperatorRaftPeer supports actions on Raft peers. Currently we only support // OperatorRaftPeer supports actions on Raft peers. Currently we only support
// removing peers by address. // removing peers by address.
func (s *HTTPHandlers) OperatorRaftPeer(resp http.ResponseWriter, req *http.Request) (interface{}, error) { func (s *HTTPHandlers) OperatorRaftPeer(resp http.ResponseWriter, req *http.Request) (interface{}, error) {

View File

@ -0,0 +1,103 @@
package operator
import (
"context"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/acl/resolver"
external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pboperator"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc"
)
// For private/internal gRPC handlers, protoc-gen-rpc-glue generates the
// requisite methods to satisfy the structs.RPCInfo interface using fields
// from the pbcommon package. This service is public, so we can't use those
// fields in our proto definition. Instead, we construct our RPCInfo manually.
var writeRequest struct {
structs.WriteRequest
structs.DCSpecificRequest
}
var readRequest struct {
structs.QueryOptions
structs.DCSpecificRequest
}
// Server implements pboperator.OperatorService to provide RPC operations for
// managing operator operation.
type Server struct {
Config
}
func (s *Server) TransferLeader(ctx context.Context, request *pboperator.TransferLeaderRequest) (*pboperator.TransferLeaderResponse, error) {
resp := &pboperator.TransferLeaderResponse{Success: false}
handled, err := s.ForwardRPC(&writeRequest, func(conn *grpc.ClientConn) error {
ctx := external.ForwardMetadataContext(ctx)
var err error
resp, err = pboperator.NewOperatorServiceClient(conn).TransferLeader(ctx, request)
return err
})
if handled || err != nil {
return resp, err
}
var authzCtx acl.AuthorizerContext
entMeta := structs.DefaultEnterpriseMetaInDefaultPartition()
options, err := external.QueryOptionsFromContext(ctx)
if err != nil {
return nil, err
}
authz, err := s.Backend.ResolveTokenAndDefaultMeta(options.Token, entMeta, &authzCtx)
if err != nil {
return resp, err
}
if err := authz.ToAllowAuthorizer().OperatorWriteAllowed(&authzCtx); err != nil {
return resp, err
}
return s.Backend.TransferLeader(ctx, request)
}
type Config struct {
Backend Backend
Logger hclog.Logger
ForwardRPC func(structs.RPCInfo, func(*grpc.ClientConn) error) (bool, error)
Datacenter string
}
func NewServer(cfg Config) *Server {
requireNotNil(cfg.Backend, "Backend")
requireNotNil(cfg.Logger, "Logger")
requireNotNil(cfg.ForwardRPC, "ForwardRPC")
if cfg.Datacenter == "" {
panic("Datacenter is required")
}
return &Server{
Config: cfg,
}
}
func requireNotNil(v interface{}, name string) {
if v == nil {
panic(name + " is required")
}
}
var _ pboperator.OperatorServiceServer = (*Server)(nil)
func (s *Server) Register(grpcServer *grpc.Server) {
pboperator.RegisterOperatorServiceServer(grpcServer, s)
}
// Backend defines the core integrations the Operator endpoint depends on. A
// functional implementation will integrate with various operator operation such as
// raft, autopilot operation. The only currently implemented operation is raft leader transfer
type Backend interface {
TransferLeader(ctx context.Context, request *pboperator.TransferLeaderRequest) (*pboperator.TransferLeaderResponse, error)
ResolveTokenAndDefaultMeta(token string, entMeta *acl.EnterpriseMeta, authzCtx *acl.AuthorizerContext) (resolver.Result, error)
}

View File

@ -0,0 +1,104 @@
package operator
import (
"context"
"fmt"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/acl/resolver"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pboperator"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc"
"testing"
"github.com/stretchr/testify/require"
)
type MockBackend struct {
mock.Mock
authorizer acl.Authorizer
}
func (m *MockBackend) TransferLeader(ctx context.Context, request *pboperator.TransferLeaderRequest) (*pboperator.TransferLeaderResponse, error) {
called := m.Called(ctx, request)
ret := called.Get(0)
if ret == nil {
return nil, called.Error(1)
}
return ret.(*pboperator.TransferLeaderResponse), called.Error(1)
}
func (m *MockBackend) ResolveTokenAndDefaultMeta(token string, entMeta *acl.EnterpriseMeta, authzCtx *acl.AuthorizerContext) (resolver.Result, error) {
return resolver.Result{Authorizer: m.authorizer}, nil
}
func TestLeaderTransfer_ACL_Deny(t *testing.T) {
authorizer := acl.MockAuthorizer{}
authorizer.On("OperatorWrite", mock.Anything).Return(acl.Deny)
server := NewServer(Config{Datacenter: "dc1", Backend: &MockBackend{authorizer: &authorizer}, Logger: hclog.New(nil), ForwardRPC: doForwardRPC})
_, err := server.TransferLeader(context.Background(), &pboperator.TransferLeaderRequest{})
require.Error(t, err)
require.Equal(t, "Permission denied: provided token lacks permission 'operator:write'", err.Error())
}
func TestLeaderTransfer_ACL_Allowed(t *testing.T) {
authorizer := &acl.MockAuthorizer{}
authorizer.On("OperatorWrite", mock.Anything).Return(acl.Allow)
backend := &MockBackend{authorizer: authorizer}
backend.On("TransferLeader", mock.Anything, mock.Anything).Return(nil, nil)
server := NewServer(Config{Datacenter: "dc1", Backend: backend, Logger: hclog.New(nil), ForwardRPC: doForwardRPC})
_, err := server.TransferLeader(context.Background(), &pboperator.TransferLeaderRequest{})
require.NoError(t, err)
}
func TestLeaderTransfer_LeaderTransfer_Fail(t *testing.T) {
authorizer := &acl.MockAuthorizer{}
authorizer.On("OperatorWrite", mock.Anything).Return(acl.Allow)
backend := &MockBackend{authorizer: authorizer}
backend.On("TransferLeader", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("test"))
server := NewServer(Config{Datacenter: "dc1", Backend: backend, Logger: hclog.New(nil), ForwardRPC: doForwardRPC})
_, err := server.TransferLeader(context.Background(), &pboperator.TransferLeaderRequest{})
require.Error(t, err)
require.Equal(t, "test", err.Error())
}
func TestLeaderTransfer_LeaderTransfer_Success(t *testing.T) {
authorizer := &acl.MockAuthorizer{}
authorizer.On("OperatorWrite", mock.Anything).Return(acl.Allow)
backend := &MockBackend{authorizer: authorizer}
backend.On("TransferLeader", mock.Anything, mock.Anything).Return(&pboperator.TransferLeaderResponse{Success: true}, nil)
server := NewServer(Config{Datacenter: "dc1", Backend: backend, Logger: hclog.New(nil), ForwardRPC: doForwardRPC})
ret, err := server.TransferLeader(context.Background(), &pboperator.TransferLeaderRequest{})
require.NoError(t, err)
require.NotNil(t, ret)
require.True(t, ret.Success)
}
func TestLeaderTransfer_LeaderTransfer_ForwardRPC(t *testing.T) {
authorizer := &acl.MockAuthorizer{}
authorizer.On("OperatorWrite", mock.Anything).Return(acl.Allow)
backend := &MockBackend{authorizer: authorizer}
backend.On("TransferLeader", mock.Anything, mock.Anything).Return(&pboperator.TransferLeaderResponse{}, nil)
server := NewServer(Config{Datacenter: "dc1", Backend: backend, Logger: hclog.New(nil), ForwardRPC: noopForwardRPC})
ret, err := server.TransferLeader(context.Background(), &pboperator.TransferLeaderRequest{})
require.NoError(t, err)
require.NotNil(t, ret)
require.False(t, ret.Success)
}
func noopForwardRPC(structs.RPCInfo, func(*grpc.ClientConn) error) (bool, error) {
return true, nil
}
func doForwardRPC(structs.RPCInfo, func(*grpc.ClientConn) error) (bool, error) {
return false, nil
}

View File

@ -36,6 +36,11 @@ type RaftConfiguration struct {
Index uint64 Index uint64
} }
// TransferLeaderResponse is returned when querying for the current Raft configuration.
type TransferLeaderResponse struct {
Success bool
}
// RaftGetConfiguration is used to query the current Raft peer set. // RaftGetConfiguration is used to query the current Raft peer set.
func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, error) { func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, error) {
r := op.c.newRequest("GET", "/v1/operator/raft/configuration") r := op.c.newRequest("GET", "/v1/operator/raft/configuration")
@ -56,6 +61,26 @@ func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, e
return &out, nil return &out, nil
} }
// RaftLeaderTransfer is used to transfer the current raft leader to another node
func (op *Operator) RaftLeaderTransfer(q *QueryOptions) (*TransferLeaderResponse, error) {
r := op.c.newRequest("POST", "/v1/operator/raft/transfer-leader")
r.setQueryOptions(q)
_, resp, err := op.c.doRequest(r)
if err != nil {
return nil, err
}
defer closeResponseBody(resp)
if err := requireOK(resp); err != nil {
return nil, err
}
var out TransferLeaderResponse
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return &out, nil
}
// RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft // RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft
// quorum but no longer known to Serf or the catalog) by address in the form of // quorum but no longer known to Serf or the catalog) by address in the form of
// "IP:port". // "IP:port".

View File

@ -36,3 +36,21 @@ func TestAPI_OperatorRaftRemovePeerByAddress(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
} }
func TestAPI_OperatorRaftLeaderTransfer(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
// If we get this error, it proves we sent the address all the way
// through.
operator := c.Operator()
transfer, err := operator.RaftLeaderTransfer(nil)
if err == nil || !strings.Contains(err.Error(),
"cannot find peer") {
t.Fatalf("err: %v", err)
}
if transfer != nil {
t.Fatalf("err:%v", transfer)
}
}

View File

@ -0,0 +1,90 @@
package transferleader
import (
"flag"
"fmt"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/command/flags"
"github.com/mitchellh/cli"
)
func New(ui cli.Ui) *cmd {
c := &cmd{UI: ui}
c.init()
return c
}
type cmd struct {
UI cli.Ui
flags *flag.FlagSet
http *flags.HTTPFlags
help string
id string
}
func (c *cmd) init() {
c.flags = flag.NewFlagSet("", flag.ContinueOnError)
c.http = &flags.HTTPFlags{}
c.flags.StringVar(&c.id, "id", "",
"The ID to remove from the Raft configuration.")
flags.Merge(c.flags, c.http.ClientFlags())
flags.Merge(c.flags, c.http.ServerFlags())
c.help = flags.Usage(help, c.flags)
}
func (c *cmd) Run(args []string) int {
if err := c.flags.Parse(args); err != nil {
if err == flag.ErrHelp {
return 0
}
c.UI.Error(fmt.Sprintf("Failed to parse args: %v", err))
return 1
}
// Set up a client.
client, err := c.http.APIClient()
if err != nil {
c.UI.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}
// Fetch the current configuration.
result, err := raftTransferLeader(client, c.http.Stale())
if err != nil {
c.UI.Error(fmt.Sprintf("Error transfering leadership: %v", err))
return 1
}
c.UI.Output(result)
return 0
}
func raftTransferLeader(client *api.Client, stale bool) (string, error) {
q := &api.QueryOptions{
AllowStale: stale,
}
reply, err := client.Operator().RaftLeaderTransfer(q)
if err != nil {
return "", fmt.Errorf("Failed to transfer leadership %w", err)
}
if !reply.Success {
return "", fmt.Errorf("Failed to transfer leadership")
}
return "Success", nil
}
func (c *cmd) Synopsis() string {
return synopsis
}
func (c *cmd) Help() string {
return c.help
}
const synopsis = "Transfer raft leadership to another node"
const help = `
Usage: consul operator raft transfer-leader [options]
Transfer raft leadership to another node.
`

View File

@ -0,0 +1,43 @@
package transferleader
import (
"github.com/hashicorp/consul/agent"
"github.com/mitchellh/cli"
"strings"
"testing"
)
func TestOperatorRaftTransferLeaderCommand_noTabs(t *testing.T) {
t.Parallel()
if strings.ContainsRune(New(cli.NewMockUi()).Help(), '\t') {
t.Fatal("help has tabs")
}
}
// This only test that the command behave correctly when only one agent is present
// and no leadership transfer is possible, testing for the functionality will be done at the RPC level.
func TestOperatorRaftTransferLeaderWithSingleNode(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
a := agent.NewTestAgent(t, ``)
defer a.Shutdown()
expected := "cannot find peer"
// Test the transfer-leader subcommand directly
ui := cli.NewMockUi()
c := New(ui)
args := []string{"-http-addr=" + a.HTTPAddr()}
code := c.Run(args)
if code != 1 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
output := strings.TrimSpace(ui.ErrorWriter.String())
if !strings.Contains(output, expected) {
t.Fatalf("bad: %q, %q", output, expected)
}
}

View File

@ -2,6 +2,7 @@ package command
import ( import (
"fmt" "fmt"
"github.com/hashicorp/consul/command/operator/raft/transferleader"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
@ -220,6 +221,7 @@ func RegisteredCommands(ui cli.Ui) map[string]mcli.CommandFactory {
entry{"operator raft", func(cli.Ui) (cli.Command, error) { return operraft.New(), nil }}, entry{"operator raft", func(cli.Ui) (cli.Command, error) { return operraft.New(), nil }},
entry{"operator raft list-peers", func(ui cli.Ui) (cli.Command, error) { return operraftlist.New(ui), nil }}, entry{"operator raft list-peers", func(ui cli.Ui) (cli.Command, error) { return operraftlist.New(ui), nil }},
entry{"operator raft remove-peer", func(ui cli.Ui) (cli.Command, error) { return operraftremove.New(ui), nil }}, entry{"operator raft remove-peer", func(ui cli.Ui) (cli.Command, error) { return operraftremove.New(ui), nil }},
entry{"operator raft transfer-leader", func(ui cli.Ui) (cli.Command, error) { return transferleader.New(ui), nil }},
entry{"peering", func(cli.Ui) (cli.Command, error) { return peering.New(), nil }}, entry{"peering", func(cli.Ui) (cli.Command, error) { return peering.New(), nil }},
entry{"peering delete", func(ui cli.Ui) (cli.Command, error) { return peerdelete.New(ui), nil }}, entry{"peering delete", func(ui cli.Ui) (cli.Command, error) { return peerdelete.New(ui), nil }},
entry{"peering generate-token", func(ui cli.Ui) (cli.Command, error) { return peergenerate.New(ui), nil }}, entry{"peering generate-token", func(ui cli.Ui) (cli.Command, error) { return peergenerate.New(ui), nil }},

View File

@ -0,0 +1,18 @@
// Code generated by mog. DO NOT EDIT.
package pboperator
import "github.com/hashicorp/consul/api"
func TransferLeaderResponseToAPI(s *TransferLeaderResponse, t *api.TransferLeaderResponse) {
if s == nil {
return
}
t.Success = s.Success
}
func TransferLeaderResponseFromAPI(t *api.TransferLeaderResponse, s *TransferLeaderResponse) {
if s == nil {
return
}
s.Success = t.Success
}

View File

@ -0,0 +1,28 @@
// Code generated by protoc-gen-go-binary. DO NOT EDIT.
// source: proto/pboperator/operator.proto
package pboperator
import (
"github.com/golang/protobuf/proto"
)
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *TransferLeaderRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *TransferLeaderRequest) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *TransferLeaderResponse) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *TransferLeaderResponse) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}

View File

@ -0,0 +1,242 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.26.0
// protoc (unknown)
// source: proto/pboperator/operator.proto
package pboperator
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type TransferLeaderRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
ID string `protobuf:"bytes,1,opt,name=ID,proto3" json:"ID,omitempty"`
}
func (x *TransferLeaderRequest) Reset() {
*x = TransferLeaderRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_proto_pboperator_operator_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *TransferLeaderRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*TransferLeaderRequest) ProtoMessage() {}
func (x *TransferLeaderRequest) ProtoReflect() protoreflect.Message {
mi := &file_proto_pboperator_operator_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use TransferLeaderRequest.ProtoReflect.Descriptor instead.
func (*TransferLeaderRequest) Descriptor() ([]byte, []int) {
return file_proto_pboperator_operator_proto_rawDescGZIP(), []int{0}
}
func (x *TransferLeaderRequest) GetID() string {
if x != nil {
return x.ID
}
return ""
}
// mog annotation:
//
// target=github.com/hashicorp/consul/api.TransferLeaderResponse
// output=operator.gen.go
// name=API
type TransferLeaderResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// true if the transfer is a success
Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"`
}
func (x *TransferLeaderResponse) Reset() {
*x = TransferLeaderResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_proto_pboperator_operator_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *TransferLeaderResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*TransferLeaderResponse) ProtoMessage() {}
func (x *TransferLeaderResponse) ProtoReflect() protoreflect.Message {
mi := &file_proto_pboperator_operator_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use TransferLeaderResponse.ProtoReflect.Descriptor instead.
func (*TransferLeaderResponse) Descriptor() ([]byte, []int) {
return file_proto_pboperator_operator_proto_rawDescGZIP(), []int{1}
}
func (x *TransferLeaderResponse) GetSuccess() bool {
if x != nil {
return x.Success
}
return false
}
var File_proto_pboperator_operator_proto protoreflect.FileDescriptor
var file_proto_pboperator_operator_proto_rawDesc = []byte{
0x0a, 0x1f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x62, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74,
0x6f, 0x72, 0x2f, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x12, 0x22, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e,
0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x6f, 0x70, 0x65,
0x72, 0x61, 0x74, 0x6f, 0x72, 0x22, 0x27, 0x0a, 0x15, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x65,
0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e,
0x0a, 0x02, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x49, 0x44, 0x22, 0x32,
0x0a, 0x16, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63,
0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65,
0x73, 0x73, 0x32, 0x9d, 0x01, 0x0a, 0x0f, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x53,
0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x89, 0x01, 0x0a, 0x0e, 0x54, 0x72, 0x61, 0x6e, 0x73,
0x66, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x39, 0x2e, 0x68, 0x61, 0x73, 0x68,
0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74,
0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x54,
0x72, 0x61, 0x6e, 0x73, 0x66, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x1a, 0x3a, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70,
0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c,
0x2e, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66,
0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x22, 0x00, 0x42, 0x91, 0x02, 0x0a, 0x26, 0x63, 0x6f, 0x6d, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69,
0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65,
0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x42, 0x0d, 0x4f,
0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x2c,
0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69,
0x63, 0x6f, 0x72, 0x70, 0x2f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x2f, 0x70, 0x62, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0xa2, 0x02, 0x04, 0x48,
0x43, 0x49, 0x4f, 0xaa, 0x02, 0x22, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e,
0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e,
0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0xca, 0x02, 0x22, 0x48, 0x61, 0x73, 0x68, 0x69,
0x63, 0x6f, 0x72, 0x70, 0x5c, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x5c, 0x49, 0x6e, 0x74, 0x65,
0x72, 0x6e, 0x61, 0x6c, 0x5c, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0xe2, 0x02, 0x2e,
0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x5c, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c,
0x5c, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x5c, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74,
0x6f, 0x72, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02,
0x25, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x3a, 0x3a, 0x43, 0x6f, 0x6e, 0x73,
0x75, 0x6c, 0x3a, 0x3a, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x3a, 0x3a, 0x4f, 0x70,
0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_proto_pboperator_operator_proto_rawDescOnce sync.Once
file_proto_pboperator_operator_proto_rawDescData = file_proto_pboperator_operator_proto_rawDesc
)
func file_proto_pboperator_operator_proto_rawDescGZIP() []byte {
file_proto_pboperator_operator_proto_rawDescOnce.Do(func() {
file_proto_pboperator_operator_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_pboperator_operator_proto_rawDescData)
})
return file_proto_pboperator_operator_proto_rawDescData
}
var file_proto_pboperator_operator_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_proto_pboperator_operator_proto_goTypes = []interface{}{
(*TransferLeaderRequest)(nil), // 0: hashicorp.consul.internal.operator.TransferLeaderRequest
(*TransferLeaderResponse)(nil), // 1: hashicorp.consul.internal.operator.TransferLeaderResponse
}
var file_proto_pboperator_operator_proto_depIdxs = []int32{
0, // 0: hashicorp.consul.internal.operator.OperatorService.TransferLeader:input_type -> hashicorp.consul.internal.operator.TransferLeaderRequest
1, // 1: hashicorp.consul.internal.operator.OperatorService.TransferLeader:output_type -> hashicorp.consul.internal.operator.TransferLeaderResponse
1, // [1:2] is the sub-list for method output_type
0, // [0:1] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_proto_pboperator_operator_proto_init() }
func file_proto_pboperator_operator_proto_init() {
if File_proto_pboperator_operator_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_proto_pboperator_operator_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*TransferLeaderRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_proto_pboperator_operator_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*TransferLeaderResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_proto_pboperator_operator_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_proto_pboperator_operator_proto_goTypes,
DependencyIndexes: file_proto_pboperator_operator_proto_depIdxs,
MessageInfos: file_proto_pboperator_operator_proto_msgTypes,
}.Build()
File_proto_pboperator_operator_proto = out.File
file_proto_pboperator_operator_proto_rawDesc = nil
file_proto_pboperator_operator_proto_goTypes = nil
file_proto_pboperator_operator_proto_depIdxs = nil
}

View File

@ -0,0 +1,24 @@
syntax = "proto3";
package hashicorp.consul.internal.operator;
// Operator defines a set of operators operation applicable to Consul
service OperatorService {
//Transfer raft leadership to another node
rpc TransferLeader(TransferLeaderRequest) returns (TransferLeaderResponse) {}
}
message TransferLeaderRequest {
string ID = 1;
}
//
// mog annotation:
//
// target=github.com/hashicorp/consul/api.TransferLeaderResponse
// output=operator.gen.go
// name=API
message TransferLeaderResponse {
// true if the transfer is a success
bool success = 1;
}

View File

@ -0,0 +1,105 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc (unknown)
// source: proto/pboperator/operator.proto
package pboperator
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// OperatorServiceClient is the client API for OperatorService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type OperatorServiceClient interface {
// Transfer raft leadership to another node
TransferLeader(ctx context.Context, in *TransferLeaderRequest, opts ...grpc.CallOption) (*TransferLeaderResponse, error)
}
type operatorServiceClient struct {
cc grpc.ClientConnInterface
}
func NewOperatorServiceClient(cc grpc.ClientConnInterface) OperatorServiceClient {
return &operatorServiceClient{cc}
}
func (c *operatorServiceClient) TransferLeader(ctx context.Context, in *TransferLeaderRequest, opts ...grpc.CallOption) (*TransferLeaderResponse, error) {
out := new(TransferLeaderResponse)
err := c.cc.Invoke(ctx, "/hashicorp.consul.internal.operator.OperatorService/TransferLeader", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// OperatorServiceServer is the server API for OperatorService service.
// All implementations should embed UnimplementedOperatorServiceServer
// for forward compatibility
type OperatorServiceServer interface {
// Transfer raft leadership to another node
TransferLeader(context.Context, *TransferLeaderRequest) (*TransferLeaderResponse, error)
}
// UnimplementedOperatorServiceServer should be embedded to have forward compatible implementations.
type UnimplementedOperatorServiceServer struct {
}
func (UnimplementedOperatorServiceServer) TransferLeader(context.Context, *TransferLeaderRequest) (*TransferLeaderResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method TransferLeader not implemented")
}
// UnsafeOperatorServiceServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to OperatorServiceServer will
// result in compilation errors.
type UnsafeOperatorServiceServer interface {
mustEmbedUnimplementedOperatorServiceServer()
}
func RegisterOperatorServiceServer(s grpc.ServiceRegistrar, srv OperatorServiceServer) {
s.RegisterService(&OperatorService_ServiceDesc, srv)
}
func _OperatorService_TransferLeader_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(TransferLeaderRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(OperatorServiceServer).TransferLeader(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/hashicorp.consul.internal.operator.OperatorService/TransferLeader",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(OperatorServiceServer).TransferLeader(ctx, req.(*TransferLeaderRequest))
}
return interceptor(ctx, in, info, handler)
}
// OperatorService_ServiceDesc is the grpc.ServiceDesc for OperatorService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var OperatorService_ServiceDesc = grpc.ServiceDesc{
ServiceName: "hashicorp.consul.internal.operator.OperatorService",
HandlerType: (*OperatorServiceServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "TransferLeader",
Handler: _OperatorService_TransferLeader_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "proto/pboperator/operator.proto",
}

View File

@ -145,3 +145,37 @@ $ curl \
--request DELETE \ --request DELETE \
"http://127.0.0.1:8500/v1/operator/raft/peer?address=1.2.3.4:5678" "http://127.0.0.1:8500/v1/operator/raft/peer?address=1.2.3.4:5678"
``` ```
## Transfer Raft Leadership
This endpoint transfers the Raft leadership from the current leader to a different Raft peer.
The new leader is selected at random unless explicitly specified with the `id` parameter.
| Method | Path | Produces |
| -------- | ------------------------------- | ------------------ |
| `POST` | `/operator/raft/transfer-leader` | `application/json` |
The table below shows this endpoint's support for
[blocking queries](/api-docs/features/blocking),
[consistency modes](/api-docs/features/consistency),
[agent caching](/api-docs/features/caching), and
[required ACLs](/api-docs#authentication).
| Blocking Queries | Consistency Modes | Agent Caching | ACL Required |
| ---------------- | ----------------- | ------------- | ---------------- |
| `NO` | `none` | `none` | `operator:write` |
The corresponding CLI command is [`consul operator raft transfer-leader`](/commands/operator/raft#transfer-leader).
### Query Parameters
- `id` `(string: "")` - Specifies the node ID of the Raft peer to transfer leadership to.
If empty, leadership transfers to a random server agent.
### Sample Request
```shell-session
$ curl \
--request POST \
"http://127.0.0.1:8500/v1/operator/raft/transfer-leader?id=09cfc046-e74a-ad49-1aad-c2161b7fe677"
```

View File

@ -104,3 +104,26 @@ Usage: `consul operator raft remove-peer -address="IP:port"`
- `-id` - ID of the server to remove. - `-id` - ID of the server to remove.
The return code will indicate success or failure. The return code will indicate success or failure.
## transfer-leader
Corresponding HTTP API Endpoint: [\[POST\] /v1/operator/raft/transfer-leader](/api-docs/operator/raft#transfer-raft-leadership)
This command transfers Raft leadership to another server agent. If an `id` is provided, Consul transfers leadership to the server with that id.
Use this command to change leadership without restarting the leader node, which maintains quorum and workload capacity.
The table below shows this command's [required ACLs](/api-docs#authentication). Configuration of
[blocking queries](/api-docs/features/blocking) and [agent caching](/api-docs/features/caching)
are not supported from commands, but may be from the corresponding HTTP endpoint.
| ACL Required |
| ---------------- |
| `operator:write` |
Usage: `consul operator raft transfer-leader -id="server id"`
- `-id` - Specifies the node ID of the raft peer to transfer leadership to.
If empty, leadership transfers to a random server agent.
The return code indicates success or failure.