sagetasks.nextflowtower package

Submodules

sagetasks.nextflowtower.client module

class sagetasks.nextflowtower.client.TowerClient(tower_token: Optional[str] = None, tower_api_url: Optional[str] = None, debug_mode: bool = False)[source]

Bases: object

get_valid_name(full_name: str) str[source]

Generate Tower-friendly name from full name

Parameters

full_name (str) – Full name (with spaces/punctuation)

Returns

Name with only alphanumeric, dash and underscore characters

Return type

str

paged_request(method: str, endpoint: str, **kwargs) Iterator[dict][source]

Iterate through pages of results for a given request

Parameters
  • method (str) – An HTTP method (GET, PUT, POST, or DELETE)

  • endpoint (str) – The API endpoint with the path parameters filled in

  • **kwargs – Additional named arguments passed through to requests.request().

Returns

An iterator traversing through pages of responses

Return type

Iterator[Dict]

request(method: str, endpoint: str, **kwargs) dict[source]

Make an authenticated HTTP request to the Nextflow Tower API

Parameters
  • method (str) – An HTTP method (GET, PUT, POST, or DELETE)

  • endpoint (str) – The API endpoint with the path parameters filled in

  • **kwargs – Additional named arguments passed through to requests.request().

Returns

The raw Response object to allow for special handling

Return type

Response

sagetasks.nextflowtower.general module

sagetasks.nextflowtower.general.launch_workflow(compute_env_id: str, pipeline: str, workspace_id=None, revision: Optional[str] = None, params_yaml: Optional[str] = None, params_json: Optional[str] = None, nextflow_config: Optional[str] = None, run_name: Optional[str] = None, work_dir: Optional[str] = None, profiles: Optional[List[str]] = None, user_secrets: Optional[List[str]] = None, workspace_secrets: Optional[List[str]] = None, pre_run_script: Optional[str] = None, client_args=None)[source]

Launch a workflow run on Nextflow Tower.

You can provide your Tower credentials with the following environment variables:

  • NXF_TOWER_TOKEN=’<tower-access-token>’

  • NXF_TOWER_API_URL=’<tower-api-url>’

You can optionally enable debug mode (HTTP request logs) with the following environment variable:

  • NXF_TOWER_DEBUG=1

sagetasks.nextflowtower.typer module

sagetasks.nextflowtower.utils module

class sagetasks.nextflowtower.utils.TowerUtils(client_args: Mapping, workspace_id: Optional[int] = None)[source]

Bases: object

static bundle_client_args(auth_token: Optional[str] = None, platform: Optional[str] = 'sage', endpoint: Optional[str] = None, **kwargs) dict[source]

Bundle the information needed for authenticating a Tower client.

Parameters
  • auth_token (str, optional) – Tower access token, which gets included in the HTTP header as an Authorization Bearer token. Defaults to None, which prompts the use of the NXF_TOWER_TOKEN environment variable.

  • platform (str, optional) – Compact identifier for commonly used platforms to populate the endpoint. Options include “sage” and “tower.nf”. Defaults to “sage”.

  • endpoint (str, optional) – Full Tower API URL. This argument will override the value associated with the specified platform. Defaults to None.

Raises
  • ValueError – If platform is not valid.

  • ValueError – If platform and endpoint are not provided.

Returns

Bundle of Tower client arguments.

Return type

dict

close_workspace() None[source]

Clear default workspace for workspace-related requests.

get_compute_env(compute_env_id: str) dict[source]

Retrieve information about a given compute environment.

Parameters

compute_env_id (str) – Compute environment alphanumerical ID.

Returns

Information about the compute environment.

Return type

dict

get_workflow(workflow_id: str) dict[source]

Retrieve information about a given workflow run.

Parameters

workflow_id (str) – Workflow run alphanumerical ID.

Returns

Information about the workflow run.

Return type

dict

init_launch_workflow_data(compute_env_id: str) dict[source]

Initialize request for /workflow/launch endpoint.

You can use this method to modify the contents before passing the adjusted payload to the init_data argument on the launch_workflow() method.

Parameters

compute_env_id (str) – Compute environment alphanumerical ID.

Raises

ValueError – If the compute environment is not available.

Returns

Initial request for /workflow/launch endpoint.

Return type

dict

init_params() dict[source]

Initialize query-string parameters with workspace ID.

Returns

Parameters for workspace-related requests,

which can be passed to requests.request.

Return type

dict

launch_workflow(compute_env_id: str, pipeline: str, revision: Optional[str] = None, params_yaml: Optional[str] = None, params_json: Optional[str] = None, nextflow_config: Optional[str] = None, run_name: Optional[str] = None, work_dir: Optional[str] = None, profiles: Optional[List[str]] = (), user_secrets: Optional[List[str]] = (), workspace_secrets: Optional[List[str]] = (), pre_run_script: Optional[str] = None, init_data: Optional[Mapping] = None) dict[source]

Launch a workflow using the given compute environment.

This method will use any opened workspace if available.

Parameters
  • compute_env_id (str) – Compute environment ID where the execution will be launched.

  • pipeline (str) – Nextflow pipeline URL. This can be a GitHub shorthand like nf-core/rnaseq.

  • revision (str, optional) – A valid repository commit ID (SHA), tag, or branch name. Defaults to None.

  • params_yaml (str, optional) – Pipeline parameters in YAML format. Defaults to None.

  • params_json (str, optional) – Pipeline parameters in JSON format. Defaults to None.

  • nextflow_config (str, optional) – Additional Nextflow configuration settings can be provided here. Defaults to None.

  • run_name (str, optional) – Custom workflow run name. Defaults to None, which will automatically assign a random run name.

  • work_dir (str, optional) – The bucket path where the pipeline scratch data is stored. Defaults to None, which uses the default work directory for the given compute environment.

  • profiles (List[str], optional) – Configuration profile names to use for this execution. Defaults to an empty list.

  • user_secrets (List[str], optional) – Secrets required by the pipeline execution. Those secrets must be defined in the launching user’s account. User secrets take precedence over workspace secrets. Defaults to an empty list.

  • workspace_secrets (List[str], optional) – Secrets required by the pipeline execution. Those secrets must be defined in the opened workspace. Defaults to an empty list.

  • pre_run_script (str, optional) – A Bash script that’s executed in the same environment where Nextflow runs just before the pipeline is launched. Defaults to None.

  • init_data (Mapping, optional) – An alternate request payload for launching a workflow. It’s recommended to generate a basic request using init_launch_workflow_data() and modifying it before passing it to init_data. Defaults to None.

Returns

Information about the just-launched workflow run.

Return type

dict

open_workspace(workspace_id: Optional[int]) None[source]

Configure default workspace for workspace-related requests.

Parameters

workspace_id (Optional[int]) – Tower workspace identifier. If None, this will close any opened workspace.

property workspace: int

Retrieve default workspace for workspace-related requests.

Raises

ValueError – If a workspace isn’t open. You can open a workspace when initializing TowerUtils() or with open_workspace().

Returns

Tower workspace identifier.

Return type

int

Module contents

Collection of Nextflow Tower-related Prefect tasks