Source code for pimlico.cli.locations
# This file is part of Pimlico
# Copyright (C) 2020 Mark Granroth-Wilding
# Licensed under the GNU LGPL v3.0 - https://www.gnu.org/licenses/lgpl-3.0.en.html
from __future__ import print_function
from builtins import zip
import os
import sys
from tabulate import tabulate
from pimlico.cli.subcommands import PimlicoCLISubcommand
from pimlico.utils.core import remove_duplicates
from pimlico.utils.filesystem import move_dir_with_progress
[docs]class InputsCmd(PimlicoCLISubcommand):
command_name = "inputs"
command_help = "Show the locations of the inputs of a given module. If the input datasets " \
"are available, their actual location is shown. Otherwise, all directories " \
"in which the data is being checked for are shown"
command_desc = "Show the (expected) locations of the inputs of a given module"
[docs] def add_arguments(self, parser):
parser.add_argument("module_name", help="The name (or number) of the module to display input locations for")
[docs] def run_command(self, pipeline, opts):
module_name = opts.module_name
print("Input locations for module '%s'" % module_name)
try:
module = pipeline[module_name]
except KeyError:
print("Error: module '{}' does not exist".format(module_name), file=sys.stderr)
sys.exit(1)
# Display info for each input to this module
for input_name in module.input_names:
print("\nInput '%s'" % input_name)
input_setup_lst = module.get_input_reader_setup(input_name, always_list=True)
input_connections = module.get_input_module_connection(input_name, always_list=True)
multiple_inputs = len(input_setup_lst) > 1
if multiple_inputs:
print("Multiple input sources: showing locations for all")
for i, (input_setup, (prev_module, prev_output_name)) in enumerate(zip(input_setup_lst, input_connections)):
if multiple_inputs:
print("## Input source %d ##" % i)
input_datatype = input_setup.datatype
if input_setup.ready_to_read():
# Get the input reader
input_reader = input_setup(pipeline, module_name)
if hasattr(input_reader, "base_dir") and input_reader.base_dir is not None:
print("Data (%s) available in:" % input_datatype.full_datatype_name())
print(" - %s" % input_reader.base_dir)
else:
print("Data (%s) available, but no directory given: perhaps a filter datatype" % \
input_datatype.full_datatype_name())
else:
if hasattr(input_setup, "data_paths") and len(input_setup.data_paths) > 0:
print("Data not available. Data will be found in any of the following locations:")
# Get the relative dir within the Pimlico dir structures
rel_path = prev_module.get_output_dir(prev_output_name)
# Resolve this to all possible absolute dirs (usually two)
abs_paths = remove_duplicates([path for (name, path) in pipeline.get_data_search_paths(rel_path)])
print("\n".join(" - %s" % pth for pth in abs_paths))
else:
print("Data not available. No possible data locations available, perhaps a filter datatype")
[docs]class OutputCmd(PimlicoCLISubcommand):
command_name = "output"
command_help = "Show the location where the given module's output data will be (or has been) stored"
[docs] def add_arguments(self, parser):
parser.add_argument("module_name", help="The name (or number) of the module to display input locations for")
[docs] def run_command(self, pipeline, opts):
module_name = opts.module_name
try:
module = pipeline[module_name]
except KeyError:
print("Error: module '{}' does not exist".format(module_name), file=sys.stderr)
sys.exit(1)
# Get the output dir for the module
module_output_dir = module.get_module_output_dir(absolute=True)
print("Output location for module '%s': %s" % (module_name, module_output_dir))
[docs]class ListStoresCmd(PimlicoCLISubcommand):
command_name = "stores"
command_help = "List Pimlico stores in use and the corresponding storage locations"
command_desc = "List named Pimlico stores"
[docs] def run_command(self, pipeline, opts):
print("Absolute paths to named storage locations\n")
print(tabulate(pipeline.storage_locations, ["Name", "Path"]))
[docs]class MoveStoresCmd(PimlicoCLISubcommand):
command_name = "movestores"
command_help = "Move a particular module's output from one storage location to another"
command_desc = "Move data between stores"
[docs] def add_arguments(self, parser):
parser.add_argument("dest", help="Name of destination store")
parser.add_argument("modules", nargs="*", help="The names (or numbers) of the module whose output to move")
[docs] def run_command(self, pipeline, opts):
raise NotImplementedError("not yet updated for the new datatypes system: not safe to use!")
dest_store = opts.dest
if dest_store not in pipeline.store_names:
print("No such store: {}".format(dest_store), file=sys.stderr)
print("Available store names: {}".format(", ".join(pipeline.store_names)), file=sys.stderr)
print("Copying modules to store '%s': %s" % (opts.dest, ", ".join(opts.modules)))
for module_name in opts.modules:
# Get the path within the stores
module_path = pipeline[module_name].get_module_output_dir()
# Check which store the module's data is currently in
# TODO Update this for the new datatype system
src_store, src_path = pipeline.find_data(module_path)
if src_store is None:
print("Cannot move data for '{}' as it wasn't found in any store".format(module_name), file=sys.stderr)
continue
elif src_store == dest_store:
print("Skipping '{}' as it's alreay in store '{}'".format(module_name, dest_store), file=sys.stderr)
continue
print("Copying '{}' data from {} to {}".format(module_name, src_store, dest_store))
# Work out where it will lives in the dest store
dest_path = os.path.join(pipeline.named_storage_locations[dest_store], module_path)
move_dir_with_progress(src_path, dest_path)