22e9f679c3
This changeset implements the remaining controller detach RPCs: server-to-client and client-to-controller. The tests also uncovered a bug in our RPC for claims which is fixed here; the volume claim RPC is used for both claiming and releasing a claim on a volume. We should only submit a controller publish RPC when the claim is new and not when it's being released.
119 lines
3.3 KiB
Go
119 lines
3.3 KiB
Go
package nomad
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
metrics "github.com/armon/go-metrics"
|
|
log "github.com/hashicorp/go-hclog"
|
|
cstructs "github.com/hashicorp/nomad/client/structs"
|
|
)
|
|
|
|
// ClientCSIController is used to forward RPC requests to the targed Nomad client's
|
|
// CSIController endpoint.
|
|
type ClientCSIController struct {
|
|
srv *Server
|
|
logger log.Logger
|
|
}
|
|
|
|
func (a *ClientCSIController) AttachVolume(args *cstructs.ClientCSIControllerAttachVolumeRequest, reply *cstructs.ClientCSIControllerAttachVolumeResponse) error {
|
|
defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "attach_volume"}, time.Now())
|
|
|
|
// Verify the arguments.
|
|
if args.ControllerNodeID == "" {
|
|
return errors.New("missing ControllerNodeID")
|
|
}
|
|
|
|
// Make sure Node is valid and new enough to support RPC
|
|
snap, err := a.srv.State().Snapshot()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = getNodeForRpc(snap, args.ControllerNodeID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Get the connection to the client
|
|
state, ok := a.srv.getNodeConn(args.ControllerNodeID)
|
|
if !ok {
|
|
return findNodeConnAndForward(a.srv, args.ControllerNodeID, "ClientCSIController.AttachVolume", args, reply)
|
|
}
|
|
|
|
// Make the RPC
|
|
err = NodeRpc(state.Session, "CSIController.AttachVolume", args, reply)
|
|
if err != nil {
|
|
return fmt.Errorf("attach volume: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a *ClientCSIController) ValidateVolume(args *cstructs.ClientCSIControllerValidateVolumeRequest, reply *cstructs.ClientCSIControllerValidateVolumeResponse) error {
|
|
defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "validate_volume"}, time.Now())
|
|
|
|
// Verify the arguments.
|
|
if args.ControllerNodeID == "" {
|
|
return errors.New("missing ControllerNodeID")
|
|
}
|
|
|
|
// Make sure Node is valid and new enough to support RPC
|
|
snap, err := a.srv.State().Snapshot()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = getNodeForRpc(snap, args.ControllerNodeID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Get the connection to the client
|
|
state, ok := a.srv.getNodeConn(args.ControllerNodeID)
|
|
if !ok {
|
|
return findNodeConnAndForward(a.srv, args.ControllerNodeID, "ClientCSIController.ValidateVolume", args, reply)
|
|
}
|
|
|
|
// Make the RPC
|
|
err = NodeRpc(state.Session, "CSIController.ValidateVolume", args, reply)
|
|
if err != nil {
|
|
return fmt.Errorf("validate volume: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a *ClientCSIController) DetachVolume(args *cstructs.ClientCSIControllerDetachVolumeRequest, reply *cstructs.ClientCSIControllerDetachVolumeResponse) error {
|
|
defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "detach_volume"}, time.Now())
|
|
|
|
// Verify the arguments.
|
|
if args.ControllerNodeID == "" {
|
|
return errors.New("missing ControllerNodeID")
|
|
}
|
|
|
|
// Make sure Node is valid and new enough to support RPC
|
|
snap, err := a.srv.State().Snapshot()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = getNodeForRpc(snap, args.ControllerNodeID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Get the connection to the client
|
|
state, ok := a.srv.getNodeConn(args.ControllerNodeID)
|
|
if !ok {
|
|
return findNodeConnAndForward(a.srv, args.ControllerNodeID, "ClientCSIController.DetachVolume", args, reply)
|
|
}
|
|
|
|
// Make the RPC
|
|
err = NodeRpc(state.Session, "CSIController.DetachVolume", args, reply)
|
|
if err != nil {
|
|
return fmt.Errorf("detach volume: %v", err)
|
|
}
|
|
return nil
|
|
|
|
}
|