Skip to content
3 changes: 3 additions & 0 deletions internal/csi-common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ func getReqID(req interface{}) string {
case *csi.ControllerExpandVolumeRequest:
reqID = r.GetVolumeId()

case *csi.ControllerModifyVolumeRequest:
reqID = r.GetVolumeId()

case *csi.ControllerPublishVolumeRequest:
reqID = r.GetVolumeId()
case *csi.ControllerUnpublishVolumeRequest:
Expand Down
222 changes: 214 additions & 8 deletions internal/nvmeof/controller/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/ceph/ceph-csi/internal/nvmeof"
nvmeoferrors "github.com/ceph/ceph-csi/internal/nvmeof/errors"
"github.com/ceph/ceph-csi/internal/rbd"
rbdutil "github.com/ceph/ceph-csi/internal/rbd"
rbddriver "github.com/ceph/ceph-csi/internal/rbd/driver"
Expand Down Expand Up @@ -282,7 +283,7 @@ func (cs *Server) ControllerUnpublishVolume(
if err != nil {
log.ErrorLog(ctx, "failed to get NVMe-oF metadata for volumeID %s: %v", volumeID, err)

return nil, status.Errorf(codes.Internal, "failed to get NVMe-oF metadata: %v", err)
return nil, nvmeoferrors.ToGRPCError(err)
}

// Unpublish NVMe-oF resources
Expand All @@ -293,6 +294,54 @@ func (cs *Server) ControllerUnpublishVolume(
return &csi.ControllerUnpublishVolumeResponse{}, nil
}

// ControllerModifyVolume modifies the volume's QoS parameters.
func (cs *Server) ControllerModifyVolume(
ctx context.Context,
req *csi.ControllerModifyVolumeRequest,
) (*csi.ControllerModifyVolumeResponse, error) {
volumeID := req.GetVolumeId()
params := req.GetMutableParameters()

// Step 1: Acquire volume lock
if acquired := cs.volumeLocks.TryAcquire(volumeID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)

return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer cs.volumeLocks.Release(volumeID)

// Step 2: Parse QoS parameters from mutable_parameters
hasRBDQoS := rbd.HasQoSParams(params)
if hasRBDQoS {
log.ErrorLog(ctx, "Cannot set RBD QoS parameters on NVMe-oF volumes")

return nil, status.Error(codes.InvalidArgument, "cannot set RBD QoS parameters on NVMe-oF volumes")
}
nvmeofQoS, err := parseQoSParameters(params)
if err != nil {
log.ErrorLog(ctx, "failed to parse NVMe-oF QoS parameters: %v", err)

return nil, status.Errorf(codes.InvalidArgument, "failed to parse QoS parameters: %v", err)
}
if nvmeofQoS != nil {
return cs.modifyNVMeoFQoS(ctx, req, nvmeofQoS)
}

return &csi.ControllerModifyVolumeResponse{}, nil
}

// ControllerExpandVolume handles volume expansion requests.
// For now it only updates the capacity in the response as NVMe-oF
// this must be added because ControllerModifyVolume requires the sidecar csi-resizer. and
// csi-resizer searches for the capacity ControllerServiceCapability_RPC_EXPAND_VOLUME.
// In the future, if NVMe-oF gateway supports volume expansion, the logic must be added here.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like a shortcoming of the external-resizer sidecar. Only ModifyVolume should be allowed. But there seems to be a required check on volume resize: https://github.com/kubernetes-csi/external-resizer/blob/master/pkg/resizer/csi_resizer.go#L59

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nixpanic i think we should open a tracker with the external-resizer as there could be drivers that support ModiyVolume but not Expand volume.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created kubernetes-csi/external-resizer#545 for that, but a draft for now as I have not tested it yet

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was merged. The ControllerExpandVolume procedure can be kept until the external-resizer has a release. (Or maybe by that time nvmeof implements resizing too.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nixpanic , Aviv told me we should implement the ControllerExpandVolume() (resize nvmeof volume). So, in any case we need this function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes,ControllerExpandVolume needs to be implemented too, but in a separate PR 😄

func (cs *Server) ControllerExpandVolume(
ctx context.Context,
req *csi.ControllerExpandVolumeRequest,
) (*csi.ControllerExpandVolumeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "ControllerExpandVolume is not implemented for NVMe-oF volumes")
}

// validateCreateVolumeRequest validates the incoming request for nvmeof.
// the rest of the parameters are validated by RBD.
func validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error {
Expand All @@ -312,6 +361,22 @@ func validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error {
if err != nil {
return fmt.Errorf("invalid listeners parameter: %w", err)
}
// Validate QoS parameters - cannot mix RBD and NVMe-oF QoS
mutableParams := req.GetMutableParameters()

// check for RBD QoS parameters in both params and mutableParams
if hasRBDQoS := rbd.HasQoSParams(params); hasRBDQoS {
return errors.New("setting RBD QoS parameters on NVMe-oF volumes is not supported")
}
if hasRBDQoS := rbd.HasQoSParams(mutableParams); hasRBDQoS {
return errors.New("setting RBD QoS parameters on NVMe-oF volumes is not supported")
}

// It take the mutableParams value from the volumeAttributesClassName in the PersistentVolumeClaim yaml.
_, err = parseQoSParameters(mutableParams)
if err != nil {
return fmt.Errorf("invalid NVMe-oF QoS parameters: %w", err)
}

return nil
}
Expand Down Expand Up @@ -351,6 +416,123 @@ func parseListeners(listenersJSON string) ([]nvmeof.ListenerDetails, error) {
return listeners, nil
}

// parseQoSParameters extracts and parses QoS parameters from the given map.
func parseQoSParameters(params map[string]string) (*nvmeof.NVMeoFQosVolume, error) {
qos := &nvmeof.NVMeoFQosVolume{}
hasAnyQoS := false

parseParam := func(key, name string, dest **uint64) error {
if val, exists := params[key]; exists && val != "" {
parsed, err := strconv.ParseUint(val, 10, 64)
if err != nil {
return fmt.Errorf("invalid %s: %w", name, err)
}
*dest = &parsed
hasAnyQoS = true
}

return nil
}

if err := parseParam(nvmeof.RwIosPerSecond, nvmeof.RwIosPerSecond, &qos.RwIosPerSecond); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what will be the behavior if 0 is set for these keys. is it considered as infinite or disabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is considered as infinite. but "disable QoS" means remove the limitation, so I think it is same terms

return nil, err
}
if err := parseParam(nvmeof.RwMbytesPerSecond, nvmeof.RwMbytesPerSecond, &qos.RwMbytesPerSecond); err != nil {
return nil, err
}
if err := parseParam(nvmeof.RMbytesPerSecond, nvmeof.RMbytesPerSecond, &qos.RMbytesPerSecond); err != nil {
return nil, err
}
if err := parseParam(nvmeof.WMbytesPerSecond, nvmeof.WMbytesPerSecond, &qos.WMbytesPerSecond); err != nil {
return nil, err
}

if !hasAnyQoS {
return nil, nil
}

return qos, nil
}

// modifyNVMeoFQoS handles NVMe-oF gateway QoS modification.
func (cs *Server) modifyNVMeoFQoS(
ctx context.Context,
req *csi.ControllerModifyVolumeRequest,
qos *nvmeof.NVMeoFQosVolume,
) (*csi.ControllerModifyVolumeResponse, error) {
volumeID := req.GetVolumeId()

// Step 1: Get secrets

// Since ControllerModifyVolume doesn't receive volume context and dont have option to take secrets
// because there is no "csi.storage.k8s.io/controller-modify-secret-name" field in the SC !,
// the full solution for it is to use GetControllerExpandSecretRef but there is no such function yet.
// TODO: change the call to GetControllerExpandSecretRef once it is implemented.
secrets := req.GetSecrets()
if secrets == nil {
secretName, secretNamespace, err := util.GetControllerPublishSecretRef(volumeID, util.RBDType)
if err != nil {
log.ErrorLog(ctx, "Failed to get secret reference: %v", err)

return nil, status.Errorf(codes.Internal, "failed to get secret reference: %v", err)
}

secrets, err = k8s.GetSecret(secretName, secretNamespace)
if err != nil {
log.ErrorLog(ctx, "Failed to get secret from k8s: %v", err)

return nil, status.Errorf(codes.Internal, "failed to get secret: %v", err)
}
}

// Step 2: Get NVMe-oF metadata
nvmeofData, err := cs.getNVMeoFMetadata(ctx, secrets, volumeID)
if err != nil {
log.ErrorLog(ctx, "Failed to get NVMe-oF metadata: %v", err)

return nil, nvmeoferrors.ToGRPCError(err)
}

// Step 3: Connect to gateway
config := &nvmeof.GatewayConfig{
Address: nvmeofData.GatewayManagementInfo.Address,
Port: nvmeofData.GatewayManagementInfo.Port,
}
gateway, err := connectGateway(ctx, config)
if err != nil {
log.ErrorLog(ctx, "Gateway connection failed: %v", err)

return nil, status.Errorf(codes.Unavailable, "gateway connection failed: %v", err)
}
defer func() {
if closeErr := gateway.Destroy(); closeErr != nil {
log.ErrorLog(ctx, "Failed to close gateway connection: %v", closeErr)
}
}()

// Step 4: Apply NVMe-oF QoS via gateway
log.DebugLog(ctx, "Setting QoS for subsystem=%s, nsid=%d", nvmeofData.SubsystemNQN, nvmeofData.NamespaceID)

err = gateway.SetQoSLimitsForNamespace(ctx, nvmeofData.SubsystemNQN, nvmeofData.NamespaceID, *qos)
if err != nil {
// Check if error is EEXIST (RBD QoS already set)
if errors.Is(err, nvmeoferrors.ErrRbdQoSExists) {
log.ErrorLog(ctx, "RBD QoS already configured on volume")

return nil, status.Error(codes.InvalidArgument,
"RBD QoS already configured on this volume, cannot set NVMe-oF gateway QoS")
}

log.ErrorLog(ctx, "Failed to set QoS limits: %v", err)

return nil, status.Errorf(codes.Internal, "failed to set QoS limits: %v", err)
}

log.DebugLog(ctx, "Successfully modified NVMe-oF QoS for volume %s", volumeID)

return &csi.ControllerModifyVolumeResponse{}, nil
}

// ensureSubsystem checks if the subsystem exists, and creates it if not.
// then creates the listener.
func ensureSubsystem(
Expand Down Expand Up @@ -434,7 +616,6 @@ func cleanupEmptySubsystem(
}

// createNVMeoFResources sets up the NVMe-oF resources for the given RBD volume.
// TODO - need to support multiple listeners.
func (cs *Server) createNVMeoFResources(
ctx context.Context,
req *csi.CreateVolumeRequest,
Expand Down Expand Up @@ -465,6 +646,16 @@ func (cs *Server) createNVMeoFResources(
Port: uint32(nvmeofGatewayPort),
},
}
// extract Qos parameters if any
mutableParams := req.GetMutableParameters()
// It take the mutableParams value from the volumeAttributesClassName in the PersistentVolumeClaim yaml.
// We already verified in the validateCreateVolumeRequest that there is no RBD QoS
nvmeofQoS, err := parseQoSParameters(mutableParams)
if err != nil {
log.ErrorLog(ctx, "failed to parse NVMe-oF QoS parameters: %v", err)

return nil, fmt.Errorf("failed to parse QoS parameters: %w", err)
}

// Step 2: Connect to gateway
config, err := getGatewayConfigFromRequest(params)
Expand Down Expand Up @@ -505,6 +696,15 @@ func (cs *Server) createNVMeoFResources(
log.DebugLog(ctx, "Namespace created: %s/%s with NSID: %d", rbdPoolName, rbdImageName, nsid)
nvmeofData.NamespaceID = nsid

// Step 5: Set QoS limits if any
if nvmeofQoS != nil {
log.DebugLog(ctx, "Setting QoS limits: %s", nvmeofQoS)
if err := gateway.SetQoSLimitsForNamespace(ctx, nvmeofData.SubsystemNQN, nvmeofData.NamespaceID,
*nvmeofQoS); err != nil {
return nil, fmt.Errorf("setting QoS limits failed: %w", err)
}
}

uuid, err := gateway.GetUUIDBySubsystemAndNameSpaceID(ctx, nvmeofData.SubsystemNQN, nvmeofData.NamespaceID)
if err != nil {
return nil, fmt.Errorf("get namespace uuid failed: %w", err)
Expand Down Expand Up @@ -799,7 +999,8 @@ func (cs *Server) getNVMeoFMetadata(
// Get RBD volume
rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
if err != nil {
return nil, fmt.Errorf("failed to find volume with ID %q: %w", volumeID, err)
return nil, fmt.Errorf("%w: failed to find volume with ID %q: %w",
nvmeoferrors.ErrMetadataNotFound, volumeID, err)
}
defer rbdVol.Destroy(ctx)

Expand All @@ -820,29 +1021,34 @@ func (cs *Server) getNVMeoFMetadata(
for _, key := range requiredKeys {
value, err := rbdVol.GetMetadata(key)
if err != nil {
return nil, fmt.Errorf("failed to get %s: %w", key, err)
return nil, fmt.Errorf("%w: failed to get %s: %w",
nvmeoferrors.ErrMetadataNotFound, key, err)
}
if value == "" {
return nil, fmt.Errorf("metadata %s is empty", key)
return nil, fmt.Errorf("%w: metadata %s is empty",
nvmeoferrors.ErrMetadataNotFound, key)
}
metadata[key] = value
}

// Parse namespace ID
nsid, err := strconv.ParseUint(metadata[toRBDMetadataKey(vcNamespaceID)], 10, 32)
if err != nil {
return nil, fmt.Errorf("invalid namespace ID: %w", err)
return nil, fmt.Errorf("%w: invalid namespace ID: %w",
nvmeoferrors.ErrMetadataCorrupted, err)
}

gatewayPort, err := strconv.ParseUint(metadata[toRBDMetadataKey(vcGatewayPort)], 10, 32)
if err != nil {
return nil, fmt.Errorf("invalid gateway port: %w", err)
return nil, fmt.Errorf("%w: invalid gateway port: %w",
nvmeoferrors.ErrMetadataCorrupted, err)
}

// Parse listeners from JSON
var listeners []nvmeof.ListenerDetails
if err := json.Unmarshal([]byte(metadata[toRBDMetadataKey(vcListeners)]), &listeners); err != nil {
return nil, fmt.Errorf("failed to parse listeners JSON: %w", err)
return nil, fmt.Errorf("%w: failed to parse listeners JSON: %w",
nvmeoferrors.ErrMetadataCorrupted, err)
}

// Construct NVMe-oF volume data
Expand Down
Loading