matflow.Workflow#
- class matflow.Workflow(workflow_ref, store_fmt=None, fs_kwargs=None, **kwargs)#
Bases:
Workflow
Methods
Abort the currently running action-run of the specified task/element.
Add a loop to a subset of workflow tasks.
Add a new task after the specified task.
Add a new task before the specified task.
A context manager that batches up structural changes to the workflow and commits them to disk all together when the context manager exits.
Cancel any running jobscripts.
Check if a loop should terminate, given the specified completed run, and if so, set downstream iteration runs to be skipped.
Copy the workflow to a new path and return the copied workflow path.
Generate from a JSON file.
Generate from a JSON string.
Generate from a YAML file.
Generate from a YAML string.
Generate from either a YAML or JSON file, depending on the file extension.
Generate from a WorkflowTemplate object.
Generate from the data associated with a WorkflowTemplate object.
Get EAR IDs belonging to multiple tasks
Check if an EAR is to be skipped.
Return element action run objects from a list of IDs.
Get EARs belonging to multiple tasks
Retrieve all workflow parameter data.
Retrieve all store parameters.
Retrieve all store parameters.
Return element iteration objects from a list of IDs.
Get element iterations belonging to multiple tasks
Return element objects from a list of IDs.
Retrieve the run IDs of those runs that correspond to the final action within a named loop iteration.
Retrieve elements that are running according to the scheduler.
Retrieve runs that are running according to the scheduler.
Return the unique names of all workflow tasks.
Process the shell stdout/stderr stream according to the associated Command object.
Rechunk metadata/runs and parameters/base arrays.
Reload the workflow from disk.
Set the end time and exit code on an EAR.
Record that an EAR is to be skipped due to an upstream failure or loop termination condition being met.
Set the start time on an EAR.
Set the submission index of an EAR.
Set ElementIteration.EARs_initialised to True for the specified iteration.
Submit the workflow for execution.
Rename an existing same-path workflow (directory) so we can restore it if workflow creation fails.
- param path:
Path at which to create the new unzipped workflow. If this is an existing
Wait for the completion of specified/all submitted jobscripts.
Write run-time commands for a given EAR.
- param path:
Path at which to create the new zipped workflow. If this is an existing
Attributes
The workflow name may be different from the template name, as it includes the creation date-timestamp if generated.
Get an fsspec URL for this workflow.
- Parameters:
- abort_run(submission_idx=-1, task_idx=None, task_insert_ID=None, element_idx=None)#
Abort the currently running action-run of the specified task/element.
- add_loop(loop)#
Add a loop to a subset of workflow tasks.
- Parameters:
loop (Loop) –
- Return type:
None
- add_submission(tasks=None, JS_parallelism=None)#
- Parameters:
- Return type:
- add_task(task, new_index=None)#
- add_task_after(new_task, task_ref=None)#
Add a new task after the specified task.
- add_task_before(new_task, task_ref=None)#
Add a new task before the specified task.
- app = App(name='MatFlow', version='0.3.0a129')#
- property artifacts_path#
- batch_update(is_workflow_creation=False)#
A context manager that batches up structural changes to the workflow and commits them to disk all together when the context manager exits.
- cancel(hard=False)#
Cancel any running jobscripts.
- check_loop_termination(loop_name, run_ID)#
Check if a loop should terminate, given the specified completed run, and if so, set downstream iteration runs to be skipped.
- check_parameters_exist(id_lst)#
- copy(path=None)#
Copy the workflow to a new path and return the copied workflow path.
- Return type:
- property creation_info#
- delete()#
- property execution_path#
- classmethod from_JSON_file(JSON_path, path=None, name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None, store_kwargs=None, variables=None, status=None)#
Generate from a JSON file.
- Parameters:
JSON_path (PathLike) – The path to a workflow template in the JSON file format.
path (str | None) – The directory in which the workflow will be generated. The current directory if not specified.
name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified the WorkflowTemplate name will be used, in combination with a date-timestamp.
overwrite (bool | None) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.
store (str | None) – The persistent store to use for this workflow.
ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.
ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.
store_kwargs (Dict | None) – Keyword arguments to pass to the store’s write_empty_workflow method.
variables (Dict[str, str] | None) – String variables to substitute in the file given by JSON_path.
status (Any | None) –
- Return type:
- classmethod from_JSON_string(JSON_str, path=None, name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None, store_kwargs=None, variables=None, status=None)#
Generate from a JSON string.
- Parameters:
JSON_str (PathLike) – The JSON string containing a workflow template parametrisation.
path (str | None) – The directory in which the workflow will be generated. The current directory if not specified.
name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified the WorkflowTemplate name will be used, in combination with a date-timestamp.
overwrite (bool | None) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.
store (str | None) – The persistent store to use for this workflow.
ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.
ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.
store_kwargs (Dict | None) – Keyword arguments to pass to the store’s write_empty_workflow method.
variables (Dict[str, str] | None) – String variables to substitute in the string JSON_str.
status (Any | None) –
- Return type:
- classmethod from_YAML_file(YAML_path, path=None, name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None, store_kwargs=None, variables=None)#
Generate from a YAML file.
- Parameters:
YAML_path (PathLike) – The path to a workflow template in the YAML file format.
path (str | None) – The directory in which the workflow will be generated. The current directory if not specified.
name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified the WorkflowTemplate name will be used, in combination with a date-timestamp.
overwrite (bool | None) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.
store (str | None) – The persistent store to use for this workflow.
ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.
ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.
store_kwargs (Dict | None) – Keyword arguments to pass to the store’s write_empty_workflow method.
variables (Dict[str, str] | None) – String variables to substitute in the file given by YAML_path.
- Return type:
- classmethod from_YAML_string(YAML_str, path=None, name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None, store_kwargs=None, variables=None)#
Generate from a YAML string.
- Parameters:
YAML_str (PathLike) – The YAML string containing a workflow template parametrisation.
path (str | None) – The directory in which the workflow will be generated. The current directory if not specified.
name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified the WorkflowTemplate name will be used, in combination with a date-timestamp.
overwrite (bool | None) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.
store (str | None) – The persistent store to use for this workflow.
ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.
ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.
store_kwargs (Dict | None) – Keyword arguments to pass to the store’s write_empty_workflow method.
variables (Dict[str, str] | None) – String variables to substitute in the string YAML_str.
- Return type:
- classmethod from_file(template_path, template_format=None, path=None, name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None, store_kwargs=None, variables=None, status=None)#
Generate from either a YAML or JSON file, depending on the file extension.
- Parameters:
template_path (PathLike) – The path to a template file in YAML or JSON format, and with a “.yml”, “.yaml”, or “.json” extension.
template_format (str | None) – If specified, one of “json” or “yaml”. This forces parsing from a particular format regardless of the file extension.
path (str | None) – The directory in which the workflow will be generated. The current directory if not specified.
name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified the WorkflowTemplate name will be used, in combination with a date-timestamp.
overwrite (bool | None) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.
store (str | None) – The persistent store to use for this workflow.
ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.
ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.
store_kwargs (Dict | None) – Keyword arguments to pass to the store’s write_empty_workflow method.
variables (Dict[str, str] | None) – String variables to substitute in the file given by template_path.
status (Any | None) –
- Return type:
- classmethod from_template(template, path=None, name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None, store_kwargs=None, status=None)#
Generate from a WorkflowTemplate object.
- Parameters:
template (WorkflowTemplate) – The WorkflowTemplate object to make persistent.
path (PathLike | None) – The directory in which the workflow will be generated. The current directory if not specified.
name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified the WorkflowTemplate name will be used, in combination with a date-timestamp.
overwrite (bool | None) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.
store (str | None) – The persistent store to use for this workflow.
ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.
ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.
store_kwargs (Dict | None) – Keyword arguments to pass to the store’s write_empty_workflow method.
status (Any | None) –
- Return type:
- classmethod from_template_data(template_name, tasks=None, loops=None, resources=None, path=None, workflow_name=None, overwrite=False, store='zarr', ts_fmt=None, ts_name_fmt=None, store_kwargs=None)#
Generate from the data associated with a WorkflowTemplate object.
- Parameters:
template_name (str) – Name of the new workflow template, from which the new workflow will be generated.
tasks (List[Task] | None) – List of Task objects to add to the new workflow.
loops (List[Loop] | None) – List of Loop objects to add to the new workflow.
resources (Dict[str, Dict] | None) – Mapping of action scopes to resource requirements, to be applied to all element sets in the workflow. resources specified in an element set take precedence of those defined here for the whole workflow.
path (PathLike | None) – The directory in which the workflow will be generated. The current directory if not specified.
workflow_name (str | None) – The name of the workflow. If specified, the workflow directory will be path joined with name. If not specified template_name will be used, in combination with a date-timestamp.
overwrite (bool | None) – If True and the workflow directory (path + name) already exists, the existing directory will be overwritten.
store (str | None) – The persistent store to use for this workflow.
ts_fmt (str | None) – The datetime format to use for storing datetimes. Datetimes are always stored in UTC (because Numpy does not store time zone info), so this should not include a time zone name.
ts_name_fmt (str | None) – The datetime format to use when generating the workflow name, where it includes a timestamp.
store_kwargs (Dict | None) – Keyword arguments to pass to the store’s write_empty_workflow method.
- Return type:
- get_EAR_IDs_of_tasks(id_lst)#
Get EAR IDs belonging to multiple tasks
- get_EAR_skipped(EAR_ID)#
Check if an EAR is to be skipped.
- Parameters:
EAR_ID (int) –
- Return type:
None
- get_EARs_from_IDs(id_lst)#
Return element action run objects from a list of IDs.
- Parameters:
- Return type:
- get_EARs_of_tasks(id_lst)#
Get EARs belonging to multiple tasks
- Parameters:
- Return type:
- get_all_EARs()#
- Return type:
- get_all_element_iterations()#
- Return type:
- get_all_parameter_data(**kwargs)#
Retrieve all workflow parameter data.
- get_all_parameter_sources(**kwargs)#
Retrieve all store parameters.
- get_all_parameters(**kwargs)#
Retrieve all store parameters.
- Parameters:
kwargs (Dict) –
- Return type:
List[AnySParameter]
- get_element_iteration_IDs_from_EAR_IDs(id_lst)#
- get_element_iterations_from_IDs(id_lst)#
Return element iteration objects from a list of IDs.
- Parameters:
- Return type:
- get_element_iterations_of_tasks(id_lst)#
Get element iterations belonging to multiple tasks
- Parameters:
- Return type:
- get_elements_from_IDs(id_lst)#
Return element objects from a list of IDs.
- get_iteration_final_run_IDs(loop_map=None, id_lst=None)#
Retrieve the run IDs of those runs that correspond to the final action within a named loop iteration.
These runs represent the final action of a given element-iteration; this is used to identify which commands file to append a loop-termination check to.
- get_iteration_task_pathway(ret_iter_IDs=False, ret_data_idx=False)#
- get_parameter(index, **kwargs)#
- Parameters:
index (int) –
kwargs (Dict) –
- Return type:
AnySParameter
- get_parameters(id_lst, **kwargs)#
- Parameters:
id_lst (Iterable[int]) –
kwargs (Dict) –
- Return type:
List[AnySParameter]
- get_running_elements(submission_idx=-1, task_idx=None, task_insert_ID=None)#
Retrieve elements that are running according to the scheduler.
- get_running_runs(submission_idx=-1, task_idx=None, task_insert_ID=None, element_idx=None)#
Retrieve runs that are running according to the scheduler.
- get_store_element_iterations(id_lst)#
- Parameters:
id_lst (Iterable[int]) –
- Return type:
List[AnySElementIter]
- get_task_elements(task, idx_lst=None)#
- get_task_unique_names(map_to_insert_ID=False)#
Return the unique names of all workflow tasks.
- property id_#
- property input_files_path#
- property loops: WorkflowLoopList#
- property name#
The workflow name may be different from the template name, as it includes the creation date-timestamp if generated.
- property num_EARs#
- property num_element_iterations#
- property num_elements#
- property num_submissions#
- property num_tasks#
- process_shell_parameter_output(name, value, EAR_ID, cmd_idx, stderr=False)#
Process the shell stdout/stderr stream according to the associated Command object.
- rechunk(chunk_size=None, backup=True, status=True)#
Rechunk metadata/runs and parameters/base arrays.
- rechunk_parameter_base(chunk_size=None, backup=True, status=True)#
- rechunk_runs(chunk_size=None, backup=True, status=True)#
- reload()#
Reload the workflow from disk.
- set_EAR_end(js_idx, js_act_idx, EAR_ID, exit_code)#
Set the end time and exit code on an EAR.
If the exit code is non-zero, also set all downstream dependent EARs to be skipped. Also save any generated input/output files.
- set_EAR_skip(EAR_ID)#
Record that an EAR is to be skipped due to an upstream failure or loop termination condition being met.
- Parameters:
EAR_ID (int) –
- Return type:
None
- set_EAR_submission_index(EAR_ID, sub_idx)#
Set the submission index of an EAR.
- set_EARs_initialised(iter_ID)#
Set ElementIteration.EARs_initialised to True for the specified iteration.
- Parameters:
iter_ID (int) –
- set_parameter_value(param_id, value, commit=False)#
- show_all_EAR_statuses()#
- property store_format#
- property submissions: List[Submission]#
- property submissions_path#
- submit(ignore_errors=False, JS_parallelism=None, print_stdout=False, wait=False, add_to_known=True, return_idx=False, tasks=None, cancel=False, status=True)#
Submit the workflow for execution.
- Parameters:
ignore_errors (bool | None) – If True, ignore jobscript submission errors. If False (the default) jobscript submission will halt when a jobscript fails to submit.
JS_parallelism (bool | None) – If True, allow multiple jobscripts to execute simultaneously. Raises if set to True but the store type does not support the jobscript_parallelism feature. If not set, jobscript parallelism will be used if the store type supports it.
print_stdout (bool | None) – If True, print any jobscript submission standard output, otherwise hide it.
wait (bool | None) – If True, this command will block until the workflow execution is complete.
add_to_known (bool | None) – If True, add the submitted submissions to the known-submissions file, which is used by the show command to monitor current and recent submissions.
return_idx (bool | None) – If True, return a dict representing the jobscript indices submitted for each submission.
tasks (List[int] | None) – List of task indices to include in the new submission if no submissions already exist. By default all tasks are included if a new submission is created.
cancel (bool | None) – Immediately cancel the submission. Useful for testing and benchmarking.
status (bool | None) – If True, display a live status to track submission progress.
- Return type:
- property task_artifacts_path#
- property tasks: WorkflowTaskList#
- property template: WorkflowTemplate#
- classmethod temporary_rename(path, fs)#
Rename an existing same-path workflow (directory) so we can restore it if workflow creation fails.
Renaming will occur until the successfully completed. This means multiple new paths may be created, where only the final path should be considered the successfully renamed workflow. Other paths will be deleted.
- property ts_fmt#
- property ts_name_fmt#
- unzip(path='.', log=None)#
- Parameters:
path – Path at which to create the new unzipped workflow. If this is an existing directory, the new workflow directory will be created within this directory. Otherwise, this path will represent the new workflow directory path.
- Return type:
- property url#
Get an fsspec URL for this workflow.
- wait(sub_js=None)#
Wait for the completion of specified/all submitted jobscripts.
- Parameters:
sub_js (Dict | None) –
- write_commands(submission_idx, jobscript_idx, JS_action_idx, EAR_ID)#
Write run-time commands for a given EAR.
- zip(path='.', log=None, overwrite=False, include_execute=False, include_rechunk_backups=False)#
- Parameters:
path – Path at which to create the new zipped workflow. If this is an existing directory, the zip file will be created within this directory. Otherwise, this path is assumed to be the full file path to the new zip file.
- Return type: