# This file is part of Pimlico
# Copyright (C) 2016 Mark Granroth-Wilding
# Licensed under the GNU GPL v3.0 - http://www.gnu.org/licenses/gpl-3.0.en.html
from pimlico.utils.core import remove_duplicates
from pimlico.utils.logging import get_console_logger
if __name__ == "__main__":
from pimlico import install_core_dependencies
install_core_dependencies()
import argparse
import os
import sys
from operator import itemgetter
from traceback import print_exc, format_exception_only
from pimlico.cli.check import check_cmd, install_cmd, deps_cmd
from pimlico.cli.status import status_cmd
from pimlico.core.config import PipelineConfig, PipelineConfigParseError
from pimlico.core.modules.base import ModuleInfoLoadError
from pimlico.core.modules.execute import execute_module, ModuleExecutionError
from pimlico.utils.filesystem import copy_dir_with_progress
from .shell.runner import shell_cmd
from pimlico.core.modules.multistage import MultistageModuleInfo
[docs]def run_cmd(pipeline, opts):
debug = opts.debug
log = get_console_logger("Pimlico", debug=debug)
if opts.module_name is None:
# No module name given: default to next one that's ready to run
modules = [(module_name, pipeline[module_name]) for module_name in pipeline.modules]
ready_modules = [module_name for (module_name, module) in modules
if module.module_executable and not module.is_locked()
and module.status != "COMPLETE" and module.all_inputs_ready()]
if len(ready_modules) == 0:
print >>sys.stderr, "No modules not already completed have all their inputs ready: no module name to " \
"default to"
sys.exit(1)
else:
module_spec = ready_modules[0]
log.info("No module name specified. Defaulting to next unexecuted, ready module: '%s'" % module_spec)
else:
module_spec = opts.module_name
# In the case of a multi-stage module allow a list to be output of available stages
module_name, __, stage_name = module_spec.rpartition(":")
if stage_name in ["?", "help"]:
# Just output stage names and exit
module = pipeline[module_name]
if not isinstance(module, MultistageModuleInfo):
print "%s is not a multi-stage module" % module_name
sys.exit(1)
print "Module stages: %s" % ", ".join(stage.name for stage in module.stages)
sys.exit(0)
try:
execute_module(pipeline, module_spec, force_rerun=opts.force_rerun, debug=debug, log=log)
except ModuleInfoLoadError, e:
if debug:
print_exc()
if e.cause is not None:
print >>sys.stderr, "Caused by: %s" % "".join(format_exception_only(type(e.cause), e.cause)),
print >>sys.stderr, "Error loading module '%s': %s" % (module_spec, e)
except ModuleExecutionError, e:
if debug:
print >>sys.stderr, "Top-level error"
print >>sys.stderr, "---------------"
print_exc()
print_execution_error(e)
print >>sys.stderr, "Error executing module '%s': %s" % (module_spec, e)
except KeyboardInterrupt:
print >>sys.stderr, "Exiting before execution completed due to user interrupt"
# Raise the exception so we see the full stack trace
if debug:
raise
[docs]def print_execution_error(error):
print >>sys.stderr, "\nDetails of error"
print >>sys.stderr, "----------------"
print >>sys.stderr, "".join(format_exception_only(type(error), error)).strip("\n")
if hasattr(error, "debugging_info") and error.debugging_info is not None:
# Extra debugging information was provided by the exception
print >>sys.stderr, "\n## Further debugging info ##"
print >>sys.stderr, error.debugging_info.strip("\n")
if hasattr(error, "cause") and error.cause is not None:
# Recursively print any debugging info on the cause exception
print_execution_error(error.cause)
[docs]def reset_module(pipeline, opts):
if opts.module_name == "all":
# Reset every module, one by one
print "Resetting execution state of all modules"
pipeline.reset_all_modules()
else:
module_names = opts.module_name.split(",")
# Check for modules that depend on these ones: they should also be reset, since their input data will be rebuilt
dependent_modules = remove_duplicates(sum(
(pipeline.get_dependent_modules(module_name, recurse=True) for module_name in module_names), []
))
dependent_modules = [m for m in dependent_modules if m not in module_names]
# Don't bother to include ones that haven't been executed anyway
dependent_modules = [m for m in dependent_modules if pipeline[m].status != "UNEXECUTED"]
if len(dependent_modules) > 0:
# There are additional modules that we should reset along with these,
# but check with the user in case they didn't intend that
print "The following modules depend on %s. Their execution state will be reset too if you continue." % \
", ".join(module_names)
print " %s" % ", ".join(dependent_modules)
answer = raw_input("Do you want to continue? [y/N] ")
if answer.lower() != "y":
print "Cancelled"
return
for module_name in module_names + dependent_modules:
print "Resetting execution state of module %s" % module_name
module = pipeline[module_name]
module.reset_execution()
[docs]def list_variants(pipeline, opts):
# Main is the default pipeline config and is always available (but not included in this list)
variants = ["main"] + pipeline.available_variants
print "Available pipeline variants: %s" % ", ".join(variants)
print "Select one using the --variant option"
[docs]def short_to_long(pipeline, opts):
module_name = opts.module_name
# Get the path within the stores
module_path = pipeline[module_name].get_module_output_dir()
# Work out where it lives in the short-term and long-term stores
short_term_dir = os.path.join(pipeline.short_term_store, module_path)
long_term_dir = os.path.join(pipeline.long_term_store, module_path)
copy_dir_with_progress(short_term_dir, long_term_dir)
[docs]def unlock_cmd(pipeline, opts):
module = pipeline[opts.module_name]
if not module.is_locked():
print "Module '%s' is not locked" % opts.module_name
else:
module.unlock()
print "Module unlocked"
[docs]def browse_cmd_with_deps(*args):
# When this is first imported, it checks that it has its dependencies
from .browser.tool import browse_cmd
return browse_cmd(*args)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Main command line interface to PiMLiCo")
parser.add_argument("pipeline_config", help="Config file to load a pipeline from")
parser.add_argument("--debug", "-d", help="Output verbose debugging info", action="store_true")
parser.add_argument("--variant", "-v", help="Load a particular variant of a pipeline. For a list of available "
"variants, use the 'variants' command", default="main")
parser.add_argument("--override-local-config", "--local", "-l",
help="Override a parameter set in the local config files (usually ~/.pimlico.conf). For just "
"this execution. Specify as param=value. Use this option multiple times to override "
"more than one parameter", action="append")
parser.add_argument("--processes", "-p",
help="Set the number of processes to use for this run, where parallelization is available. "
"Overrides the local config setting. Equivalent to '-l processes=P'", type=int)
subparsers = parser.add_subparsers(help="Select a sub-command")
check = subparsers.add_parser("check",
help="Test a pipeline config to check that it can be parsed and is valid. It is "
"recommended that you run this before attempting to execute any modules")
check.set_defaults(func=check_cmd)
check.add_argument("modules", nargs="*",
help="Check runtime dependencies for named modules. By default, these are not all checked, as "
"you might be happy with them not all being satisfied at once")
check.add_argument("--verbose", "-v", action="store_true",
help="Show details of why dependencies aren't available where possible")
status = subparsers.add_parser("status", help="Output a module execution schedule for the pipeline and execution "
"status for every module")
status.add_argument("module_name", nargs="?",
help="Optionally specify a module name. More detailed status information will be outut for "
"this module")
status.add_argument("--all", "-a", action="store_true",
help="Show all modules defined in the pipeline, not just those that can be executed")
status.add_argument("--history", "-i", action="store_true",
help="When a module name is given, even more detailed output is given, including the full "
"execution history of the module")
status.set_defaults(func=status_cmd)
run = subparsers.add_parser("run", help="Execute an individual pipeline module")
run.set_defaults(func=run_cmd)
run.add_argument("module_name", help="The name of the module to run. To run a stage from a multi-stage module, "
"use 'module:stage'. Use 'status' command to see available modules. Use "
"'module:?' or 'module:help' to list available stages. If not given, defaults "
"to next incomplete module that has all its inputs ready", nargs="?")
run.add_argument("--force-rerun", "-f", action="store_true",
help="Force running the module, even if it's already been run to completion")
variants = subparsers.add_parser("variants", help="List the available variants of a pipeline config")
variants.set_defaults(func=list_variants)
reset = subparsers.add_parser("reset",
help="Delete any output from the given module and restore it to unexecuted state")
reset.set_defaults(func=reset_module)
reset.add_argument("module_name", help="The name of the module to reset, or multiple separated by commas, or "
"'all' to reset the whole pipeline")
unlock = subparsers.add_parser("unlock",
help="Forcibly remove an execution lock from a module. If a lock has ended up "
"getting left on when execution exited prematurely, use this to remove it. "
"Usually shouldn't be necessary, even if there's an error during execution")
unlock.set_defaults(func=unlock_cmd)
unlock.add_argument("module_name", help="The name of the module to unlock")
longstore = subparsers.add_parser("longstore",
help="Move a particular module's output from the short-term store to the long-term "
"store. It will still be found here by input readers. You might want to do "
"this if your long-term store is bigger, to keep down the short-term store size")
longstore.set_defaults(func=short_to_long)
longstore.add_argument("module_name", help="The name of the module whose output to move")
browse = subparsers.add_parser("browse", help="View the data output by a module")
browse.set_defaults(func=browse_cmd_with_deps)
browse.add_argument("module_name", help="The name of the module whose output to look at. Use 'module:stage' for "
"multi-stage modules")
browse.add_argument("output_name", nargs="?", help="The name of the output from the module to browse. If blank, "
"load the default output")
browse.add_argument("--raw", "-r", action="store_true",
help="Don't parse the data using the output datatype (i.e. just read the raw text). If not "
"set, we output the result of applying unicode() to the parsed data structure, or a "
"custom formatting if the datatype loaded defines one")
browse.add_argument("--formatter", "-f",
help="Fully qualified class name of a subclass of DocumentBrowserFormatter to use to determine "
"what to output for each document. If specified, --raw is ignored")
shell = subparsers.add_parser("shell", help="Open a shell to give access to the data output by a module")
shell.set_defaults(func=shell_cmd)
shell.add_argument("module_name", help="The name of the module whose output to look at")
shell.add_argument("output_name", nargs="?", help="The name of the output from the module to browse. If blank, "
"load the default output")
install = subparsers.add_parser("install", help="Install missing module library dependencies")
install.set_defaults(func=install_cmd)
install.add_argument("modules", nargs="*",
help="Check dependencies for named modules and install any that are automatically "
"installable. Use 'all' to install dependencies for all modules")
install.add_argument("--trust-downloaded", "-t", action="store_true",
help="If an archive file to be downloaded is found to be in the lib dir already, trust "
"that it is the file we're after. By default, we only reuse archives we've just "
"downloaded, so we know they came from the right URL, avoiding accidental name clashes")
deps = subparsers.add_parser("deps", help="List information about software dependencies: whether they're "
"available, versions, etc")
deps.set_defaults(func=deps_cmd)
deps.add_argument("modules", nargs="*",
help="Check dependencies for named modules and install any that are automatically "
"installable. Use 'all' to install dependencies for all modules")
opts = parser.parse_args()
if opts.override_local_config is not None:
override_local = dict(
itemgetter(0, 2)(param.partition("=")) for param in opts.override_local_config
)
else:
override_local = {}
if opts.processes is not None:
# Override the processes local config setting
override_local["processes"] = opts.processes
# Read in the pipeline config from the given file
try:
pipeline = PipelineConfig.load(opts.pipeline_config, variant=opts.variant,
override_local_config=override_local)
except PipelineConfigParseError, e:
print >>sys.stderr, "Error reading pipeline config: %s" % e
sys.exit(1)
# Run the function corresponding to the subcommand
opts.func(pipeline, opts)