# This file is part of Pimlico
# Copyright (C) 2016 Mark Granroth-Wilding
# Licensed under the GNU GPL v3.0 - http://www.gnu.org/licenses/gpl-3.0.en.html
import os
from operator import itemgetter
import colorama
from termcolor import colored
from pimlico.cli.subcommands import PimlicoCLISubcommand
from pimlico.cli.util import module_number_to_name
from pimlico.utils.format import title_box
[docs]class StatusCmd(PimlicoCLISubcommand):
command_name = "status"
command_help = "Output a module execution schedule for the pipeline and execution status for every module"
[docs] def add_arguments(self, parser):
parser.add_argument("module_name", nargs="?",
help="Optionally specify a module name (or number). More detailed status information will "
"be outut for this module. Alternatively, use this arg to limit the modules whose "
"status will be output to a range by specifying 'A...B', where A and B are module "
"names or numbers")
parser.add_argument("--all", "-a", action="store_true",
help="Show all modules defined in the pipeline, not just those that can be executed")
parser.add_argument("--short", "-s", action="store_true",
help="Use a brief format when showing the full pipeline's status. Only applies when "
"module names are not specified. This is useful with very large pipelines, where "
"you just want a compact overview of the status")
parser.add_argument("--history", "-i", action="store_true",
help="When a module name is given, even more detailed output is given, including the full "
"execution history of the module")
parser.add_argument("--deps-of", "-d",
help="Restrict to showing only the named/numbered module and any that are (transitive) "
"dependencies of it. That is, show the whole tree of modules that lead through "
"the pipeline to the given module")
parser.add_argument("--no-color", "--nc", action="store_true",
help="Don't include terminal color characters, even if the terminal appears to support "
"them. This can be useful if the automatic detection of color terminals doesn't work "
"and the status command displays lots of horrible escape characters")
[docs] def run_command(self, pipeline, opts):
# If the colour output has been disabled by a switch, use the standard env var to disable it
if opts.no_color:
os.environ["ANSI_COLORS_DISABLED"] = "1"
# Use colorama to control termcolor so that it only outputs colours to the terminal
colorama.init()
try:
# Main is the default pipeline config and is always available (but not included in this list)
variants = ["main"] + pipeline.available_variants
print "Available pipeline variants: %s" % ", ".join(variants)
print "Showing status for '%s' variant" % pipeline.variant
module_sel = opts.module_name
first_module = last_module = None
if module_sel is not None:
if "..." in module_sel:
# A module range specifier was given to limit the modules shown
first_module, __, last_module = module_sel.partition("...")
# Allow module numbers to be given
if len(first_module):
first_module = module_number_to_name(pipeline, first_module)
else:
# Start from the very beginning
first_module = None
if len(last_module):
last_module = module_number_to_name(pipeline, last_module)
else:
# Continue to the end
last_module = None
# Show the non-detailed version, since we're selecting a range, not just one
module_sel = None
elif module_sel in pipeline.expanded_modules:
# If an expanded module's base name is specified, treat it as a range covering all the modules
first_module = pipeline.expanded_modules[module_sel][0]
last_module = pipeline.expanded_modules[module_sel][-1]
module_sel = None
if module_sel is None:
# Try deriving a schedule and output it, including basic status info for each module
available_module_names = pipeline.modules
if opts.all:
# Show all modules, not just those that can be executed
print "\nAll modules in pipeline with statuses:"
module_names = list(pipeline.modules)
bullets = ["-"]*len(module_names)
else:
module_names = [("%d." % i, module) for i, module in enumerate(pipeline.get_module_schedule(), start=1)]
if len(module_names) == 0:
print "\nPipeline loaded successfully, but it does not contain any modules"
return
# If the --deps-of option is given, filter modules shown to only those that lead to the given one
if opts.deps_of is not None:
dest_module = module_number_to_name(pipeline, opts.deps_of)
print "\nRestricting status view to dependencies of module '%s'" % dest_module
# Check through the pipeline to find all dependent modules
include_mods = [dest_module] + pipeline[dest_module].get_transitive_dependencies()
module_names = [(title, module) for (title, module) in module_names if module in include_mods]
bullets, module_names = zip(*module_names)
else:
bullets, module_names = zip(*module_names)
# Fall back to "all" mode if a specific module has been requested that's not in execution schedule
if (first_module is not None and first_module not in module_names and first_module in available_module_names) \
or (last_module is not None and last_module not in module_names and last_module in available_module_names):
module_names = list(pipeline.modules)
bullets = ["-"]*len(module_names)
else:
print "\nModule execution schedule with statuses:"
# Allow the range of modules to be filtered
if first_module is not None:
# Start at the given module
try:
first_mod_idx = module_names.index(first_module)
except ValueError:
raise ValueError("tried to limit module list by '%s': no such module" % first_module)
bullets = bullets[first_mod_idx:]
module_names = module_names[first_mod_idx:]
if last_module is not None and last_module not in map(itemgetter(1), module_names):
# End at the given module
try:
last_mod_idx = module_names.index(last_module)
except ValueError:
raise ValueError("tried to limit module list by '%s': no such module" % last_module)
bullets = bullets[:last_mod_idx+1]
module_names = module_names[:last_mod_idx+1]
if opts.short:
# Show super-short version of the status
# Group module names by status
status_lists = {}
for bullet, module_name in zip(bullets, module_names):
module = pipeline[module_name]
# Add this module to the list for its status
status_lists.setdefault(module.status, []).append("%s %s" % (bullet, module_name))
for status in sorted(status_lists):
print "\n%s:" % status
print "\n".join(status_lists[status])
else:
for bullet, module_name in zip(bullets, module_names):
# Short summary for each module
module = pipeline[module_name]
print colored(status_colored(module, " %s %s" % (bullet, module_name)))
# Show the type of the module
print " type: %s" % module.module_type_name
# Check module status (has it been run?)
print " status: %s" % status_colored(module, module.status if module.module_executable else "not executable")
# Check status of each input datatypes
for input_name in module.input_names:
print " input %s: %s" % (
input_name,
colored("ready", "green") if module.input_ready(input_name) else colored("not ready", "red")
)
print " outputs: %s" % ", ".join([
colored(name, "green") if module.get_output(name).data_ready() else colored(name, "red")
for name in module.output_names
])
if module.is_locked():
print " locked: ongoing execution"
else:
# Output more detailed status information for this module
to_output = [module_sel]
already_output = []
while len(to_output):
module_name = to_output.pop()
if module_name not in already_output:
module = pipeline[module_name]
status, more_outputs = module_status(module)
# Output the module's detailed status
print status
if opts.history:
# Also output full execution history
print "\nFull execution history:"
print module.execution_history
already_output.append(module_name)
# Allow this module to request that we output further modules
to_output.extend(more_outputs)
finally:
colorama.deinit()
[docs]def module_status_color(module):
if not module.module_executable:
if module.all_inputs_ready():
return "green"
else:
return "red"
elif module.status == "COMPLETE":
return "green"
elif module.status == "UNEXECUTED":
# If the module's not been started, but its inputs are ready, use yellow
if module.all_inputs_ready():
return "yellow"
else:
return "red"
else:
# All other cases are blue -- usually partial completion, ongoing execution, etc
return "cyan"
[docs]def status_colored(module, text=None):
"""
Colour the text according to the status of the given module. If text is not given, the module's name is
returned.
"""
text = text or module.module_name
return colored(text, module_status_color(module))
[docs]def module_status(module):
"""
Detailed module status, shown when a specific module's status is requested.
"""
also_output = []
status_color = module_status_color(module)
# Put together information about the inputs
input_infos = []
for input_name in module.input_names:
for (input_datatype, (input_module, input_module_output, input_additional_names)) in \
zip(module.get_input(input_name, always_list=True),
module.get_input_module_connection(input_name, always_list=True)):
corpus_dir = input_datatype.absolute_base_dir or "not available yet"
# Format all the information about this input
input_info = """\
Input {input_name}:
{status}
From module: {input_module} ({input_module_output} output)
Datatype: {datatype_name}""".format(
input_name=input_name,
status=colored("Data ready", "green") if module.input_ready(input_name) else colored("Data not ready", "red"),
input_module=input_module.module_name,
input_module_output=input_module_output or "default",
input_datatype=input_datatype,
datatype_name=input_datatype.full_datatype_name(),
)
if input_module.module_executable:
# Executable module: if it's been executed, we get data from there
if module.input_ready(input_name):
input_info += "\n Stored in: {corpus_dir}".format(corpus_dir=corpus_dir)
elif input_module.is_filter():
input_info += "\n Input module is a filter"
else:
# Input module
input_info += "\n Pipeline input"
if input_datatype.data_ready():
# Get additional detailed information from the datatype instance
datatype_details = input_datatype.get_detailed_status()
if datatype_details:
# Indent the lines
input_info = "%s\n%s" % (input_info, "\n".join(" %s" % line for line in datatype_details))
input_infos.append(input_info)
# If filter module: output further information about where it gets its inputs from
if module.is_filter():
also_output.append(input_module.module_name)
# Do the same thing for the outputs
output_infos = []
for output_name in module.output_names:
output_datatype = module.get_output(output_name)
if module.is_filter():
corpus_dir = "filter module, output not stored"
elif output_datatype.base_dir is None:
# A None base_dir indicates that the dir in the Pimlico storage is not required
# This happens with input datatypes that require no preparation
corpus_dir = "nothing to be stored"
else:
corpus_dir = output_datatype.absolute_base_dir or "not available yet"
output_info = """\
Output {output_name}:
{status}
Datatype: {output_datatype}
Stored in: {corpus_dir}""".format(
output_name=output_name,
status=colored("Data available", "green") if output_datatype.data_ready() else colored("Data not available", "red"),
output_datatype=output_datatype.full_datatype_name(),
corpus_dir=corpus_dir,
)
if output_datatype.data_ready():
# Get additional detailed information from the datatype instance
datatype_details = output_datatype.get_detailed_status()
if datatype_details:
# Indent the lines
output_info = "%s\n%s" % (output_info, "\n".join(" %s" % line for line in datatype_details))
output_infos.append(output_info)
# Get additional detailed information from the module instance
module_details = module.get_detailed_status()
module_details = "\n%s" % "\n".join(module_details) if module_details else ""
if module.docstring:
docstring = "%s\n" % module.docstring
else:
docstring = ""
# Put together a neat summary, include the things we've formatted above
return """
{title}
{docstring}Status: {status}
Type: {type}
{inputs}
{outputs}{lock_status}
Options:
{options}
Module variables:
{modvars}{module_details}""".format(
title=colored(title_box("Module: %s" % module.module_name), status_color),
status=colored("not executable", "green") if not module.module_executable else colored(module.status, status_color),
inputs="\n".join(input_infos) if input_infos else "No inputs",
outputs="\n".join(output_infos) if output_infos else "No outputs",
options="\n ".join("%s: %s" % (key, val) for (key, val) in module.options.items()),
module_details=module_details,
lock_status="" if not module.is_locked() else "\nLocked: ongoing execution",
docstring=docstring,
type="%s -- %s" % (module.module_type_name, module.module_readable_name)
if module.module_readable_name else module.module_type_name,
modvars="\n ".join("%s: %s" % (var, val) for (var, val) in module.module_variables.items())
), also_output