Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions modules/nf-tower/src/main/io/seqera/tower/plugin/DatasetHelper.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright (c) 2019, Seqera Labs.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* This Source Code Form is "Incompatible With Secondary Licenses", as
* defined by the Mozilla Public License, v. 2.0.
*/

package io.seqera.tower.plugin

import groovy.json.JsonSlurper
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.Global
import nextflow.Session
import nextflow.exception.AbortOperationException
import nextflow.util.SimpleHttpClient

/**
* Helper class to download datasets from Seqera Platform
*
* @author Edmund Miller
*/
@Slf4j
@CompileStatic
class DatasetHelper {

static private final String TOKEN_PREFIX = '@token:'

private String endpoint
private String accessToken
private SimpleHttpClient httpClient

DatasetHelper() {
this.endpoint = getEndpoint()
this.accessToken = getAccessToken()
this.httpClient = new SimpleHttpClient().setAuthToken(TOKEN_PREFIX + accessToken)
}

DatasetHelper(String endpoint, String accessToken) {
this.endpoint = endpoint
this.accessToken = accessToken
this.httpClient = new SimpleHttpClient().setAuthToken(TOKEN_PREFIX + accessToken)
}

/**
* Get the Tower endpoint URL from config or environment
*/
protected String getEndpoint() {
def session = Global.session as Session
def endpoint = session?.config?.navigate('tower.endpoint') as String
if( !endpoint || endpoint == '-' )
endpoint = TowerClient.DEF_ENDPOINT_URL
return endpoint
}

/**
* Get the Tower access token from config or environment
*/
protected String getAccessToken() {
def session = Global.session as Session
def token = session?.config?.navigate('tower.accessToken')
if( !token ) {
def env = System.getenv()
token = env.get('TOWER_ACCESS_TOKEN')
}
if( !token )
throw new AbortOperationException("Missing Nextflow Tower access token -- Make sure there's a variable TOWER_ACCESS_TOKEN in your environment")
return token
}

/**
* Download a dataset from Seqera Platform
*
* @param datasetId The dataset ID to download
* @param version The version of the dataset (defaults to latest)
* @param fileName The name of the file in the dataset (defaults to 'data.csv')
* @return The content of the dataset file as a String
*/
String downloadDataset(String datasetId, String version = null, String fileName = null) {
if( !datasetId )
throw new IllegalArgumentException("Dataset ID cannot be null or empty")

// TODO: When version is not specified, we should query the latest version
// For now, default to version 1 if not specified
final versionStr = version ?: '1'

// TODO: In the future, we should query the dataset metadata to get the actual filename
// For now, default to 'data.csv' if not specified
final fileNameStr = fileName ?: 'data.csv'

final url = buildDownloadUrl(datasetId, versionStr, fileNameStr)
log.debug "Downloading dataset from: $url"

try {
httpClient.sendHttpMessage(url, null, 'GET')
final responseCode = httpClient.responseCode

if( responseCode >= 200 && responseCode < 300 ) {
return httpClient.getResponse()
} else if( responseCode == 404 ) {
throw new AbortOperationException("Dataset not found: $datasetId (version: $versionStr, file: $fileNameStr)")
} else if( responseCode == 403 ) {
throw new AbortOperationException("Access denied to dataset: $datasetId -- Check your Tower access token permissions")
} else {
throw new AbortOperationException("Failed to download dataset: $datasetId -- HTTP status: $responseCode")
}
} catch( IOException e ) {
throw new AbortOperationException("Failed to download dataset: $datasetId -- ${e.message}", e)
}
}

/**
* Build the download URL for a dataset
*
* @param datasetId The dataset ID
* @param version The dataset version
* @param fileName The file name
* @return The complete download URL
*/
protected String buildDownloadUrl(String datasetId, String version, String fileName) {
return "${endpoint}/datasets/${datasetId}/v/${version}/n/${fileName}"
}

/**
* TODO: List all datasets in a workspace
* This will use the /datasets API endpoint
*
* @param workspaceId Optional workspace ID
* @return List of available datasets
*/
// Future implementation for listing datasets
// List<Map> listDatasets(String workspaceId = null) {
// final url = workspaceId
// ? "${endpoint}/datasets?workspaceId=${workspaceId}"
// : "${endpoint}/datasets"
//
// httpClient.sendHttpMessage(url, null, 'GET')
// if( httpClient.responseCode >= 200 && httpClient.responseCode < 300 ) {
// def json = new JsonSlurper().parseText(httpClient.response)
// return json.datasets as List<Map>
// }
// return []
// }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright (c) 2019, Seqera Labs.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* This Source Code Form is "Incompatible With Secondary Licenses", as
* defined by the Mozilla Public License, v. 2.0.
*/

package io.seqera.tower.plugin

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import groovyx.gpars.dataflow.DataflowWriteChannel
import nextflow.Channel
import nextflow.Global
import nextflow.NF
import nextflow.Session
import nextflow.dag.NodeMarker
import nextflow.extension.CH

/**
* Channel extension methods for Tower/Seqera Platform integration
*
* @author Edmund Miller
*/
@Slf4j
@CompileStatic
class TowerChannelExtension {

/**
* Download a dataset from Seqera Platform and return its content as a String
*
* This function downloads a dataset file from Seqera Platform.
* It can be used in combination with other Channel factory methods.
*
* Example usage:
* <pre>
* // Basic usage - returns the dataset content as a string
* def dataset = Channel.fromDataset('my-dataset-id')
*
* // With nf-schema integration
* ch_input = Channel.fromList(
* samplesheetToList(Channel.fromDataset(params.input), "assets/schema_input.json")
* )
*
* // Specify version and filename
* def dataset = Channel.fromDataset(
* datasetId: 'my-dataset-id',
* version: '2',
* fileName: 'samples.csv'
* )
* </pre>
*
* @param datasetId The dataset ID (when called with a string)
* @return The content of the dataset file as a String
*/
static String fromDataset(Channel self, String datasetId) {
return fromDataset(self, [datasetId: datasetId])
}

/**
* Download a dataset from Seqera Platform with options
*
* @param opts Map with options:
* - datasetId: (required) The dataset ID to download
* - version: (optional) The version of the dataset (defaults to '1')
* - fileName: (optional) The name of the file in the dataset (defaults to 'data.csv')
* @return The content of the dataset file as a String
*/
static String fromDataset(Channel self, Map opts) {
final datasetId = opts.datasetId as String
final version = opts.version as String
final fileName = opts.fileName as String

if( !datasetId )
throw new IllegalArgumentException("fromDataset requires 'datasetId' parameter")

// Check if Tower is configured
checkTowerEnabled()

log.debug "Fetching dataset: $datasetId (version: ${version ?: 'latest'}, file: ${fileName ?: 'data.csv'})"

final helper = new DatasetHelper()
final content = helper.downloadDataset(datasetId, version, fileName)

log.trace "Dataset content retrieved: ${content?.length() ?: 0} characters"

return content
}

/**
* Check if Tower is properly configured
*/
protected static void checkTowerEnabled() {
def session = Global.session as Session
if( !session ) {
log.warn "Session not initialized - Tower configuration cannot be validated"
return
}

// We don't require tower.enabled=true for fromDataset to work
// as long as the access token is available
def token = session.config?.navigate('tower.accessToken')
if( !token ) {
token = System.getenv('TOWER_ACCESS_TOKEN')
}

if( !token ) {
log.debug "Tower access token not found - fromDataset may fail"
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
moduleName=nf-tower
moduleVersion=1.0
extensionClasses=io.seqera.tower.plugin.TowerChannelExtension
Loading
Loading