# This file is part of Pimlico
# Copyright (C) 2020 Mark Granroth-Wilding
# Licensed under the GNU LGPL v3.0 - https://www.gnu.org/licenses/lgpl-3.0.en.html
from __future__ import print_function
from future import standard_library
standard_library.install_aliases()
from builtins import input
import json
import os
import shutil
import socket
import tarfile
import sys
from datetime import datetime
from io import BytesIO
from pimlico.cli.subcommands import PimlicoCLISubcommand
from pimlico.utils.core import remove_duplicates
[docs]class DumpCmd(PimlicoCLISubcommand):
"""
Dump the entire available output data from a given pipeline module to a
tarball, so that it can easily be loaded into the same pipeline on another
system. This is primarily to support spreading the execution of a pipeline
between multiple machines, so that the output from a module can easily be
transferred and loaded into a pipeline.
Dump to a tarball using this command, transfer the file between machines and
then run the :doc:`load command </commands/load>` to import it there.
.. seealso::
:doc:`/guides/multiple_servers`: for a more detailed guide to transferring data across servers
"""
command_name = "dump"
command_help = "Dump the entire available output data from a given pipeline module to a " \
"tarball, so that it can easily be loaded into the same pipeline on another " \
"system. This is primarily to support spreading the execution of a pipeline " \
"between multiple machines, so that the output from a module can easily be " \
"transferred and loaded into a pipeline"
command_desc = "Dump the entire available output data from a given pipeline module to a tarball"
[docs] def add_arguments(self, parser):
parser.add_argument("modules", nargs="*",
help="Names or numbers of modules whose data to dump. If multiple are given, a separate "
"file will be dumped for each")
parser.add_argument("--output", "-o",
help="Path to directory to output to. Defaults to the current user's home directory")
parser.add_argument("--inputs", "-i", action="store_true",
help="Dump data for the modules corresponding to the inputs of the named modules, "
"instead of those modules themselves. Useful for when you're preparing to "
"run a module on a different machine, for getting all the necessary input "
"data for a module")
[docs] def run_command(self, pipeline, opts):
output_dir = opts.output
if output_dir is None:
# Default to current user's home dir
output_dir = os.path.expanduser("~")
output_dir = os.path.abspath(output_dir)
try:
named_modules = [pipeline[module_name] for module_name in opts.modules]
except KeyError as e:
print("Error: module does not exist: {}".format(e), file=sys.stderr)
sys.exit(1)
if opts.inputs:
# Get the inputs to the named modules
print("Dumping data for inputs to {}".format(", ".join(opts.modules)))
modules = []
for module in named_modules:
for input_name in module.input_names:
modules.extend(
prev_mod
for (prev_mod, output_name) in module.get_input_module_connection(input_name, always_list=True)
)
# Don't include a module multiple times
modules = remove_duplicates(modules, key=lambda m: m.module_name)
print("Modules: {}\n".format(", ".join(mod.module_name for mod in modules)))
else:
modules = named_modules
for module in modules:
module_name = module.module_name
print("== Dumping data for '%s' ==" % module_name)
# Get the output dir for the module
module_rel_output_dir = module.get_module_output_dir()
# See if it exists in any store
store_name, module_abs_output_dir = pipeline.find_data(module_rel_output_dir)
if store_name is None:
print("Could not find any output data for module '{}'. Not dumping anything".format(module_name))
continue
else:
print("Data found in {} store".format(store_name))
# Work out where to put the output
tarball_path = os.path.join(output_dir, "%s.tar.gz" % module_name)
print("Dumping to %s\n" % tarball_path)
with tarfile.open(tarball_path, mode="w:gz") as tarball:
# Prepare a file containing a bit of metadata that will be read by Pimlico if the data is loaded
dump_meta = json.dumps({
"pipeline_name": pipeline.name,
"module_name": module_name,
# Just so we know where this came from...
"dumped": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"hostname": socket.gethostname(),
"variant": pipeline.variant,
}, indent=4)
meta_info = tarfile.TarInfo("dump_metadata.json")
meta_info.size = len(dump_meta)
tarball.addfile(meta_info, BytesIO(dump_meta.encode("utf-8")))
# Add the module's output directory to the tarball (recursively)
tarball.add(module_abs_output_dir, arcname=module_name)
[docs]class LoadCmd(PimlicoCLISubcommand):
"""
Load the output data for a given pipeline module from a tarball previously created by the
`dump` command (typically on another machine).
This is primarily to support spreading the execution of a pipeline
between multiple machines, so that the output from a module can easily be
transferred and loaded into a pipeline.
Dump to a tarball using the :doc:`dump command </commands/dump>`,
transfer the file between machines and
then run this command to import it there.
.. seealso::
:doc:`/guides/multiple_servers`: for a more detailed guide to transferring data across servers
"""
command_name = "load"
command_help = "Load a module's output data from a tarball previously created by the dump " \
"command, usually on a different system. This will overwrite any output data " \
"the module already has completely, including metadata, run history, etc. " \
"You may load multiple modules' data at once"
command_desc = "Load a module's output data from a tarball previously created by the dump command"
[docs] def add_arguments(self, parser):
parser.add_argument("paths", nargs="*", help="Paths to dump files (tarballs) to load into the pipeline")
parser.add_argument("--force-overwrite", "-f", action="store_true",
help="If data already exists for a module being imported, overwrite without asking. "
"By default, the user will be prompted to check whether they want to overwrite")
[docs] def run_command(self, pipeline, opts):
force_overwrite = opts.force_overwrite
for dump_path in opts.paths:
dump_path = os.path.abspath(dump_path)
# Try reading a tarball at the given path
with tarfile.open(dump_path, mode="r:gz") as tarball:
# Tarball should include a metadata file if it's come from a dump
metadata_file = tarball.extractfile("dump_metadata.json")
dump_meta = json.loads(metadata_file.read().decode("utf-8"))
metadata_file.close()
module_name = dump_meta["module_name"]
print("== Loading data for '%s' from %s ==" % (module_name, dump_path))
# Check that the pipeline that this was dumped from had the same name
# This is just a weak sanity check to avoid mixing up data between pipelines
# In future, it may be desirable to add a '-f' switch to override this check
if pipeline.name != dump_meta["pipeline_name"]:
print("Error: the dumped data came from a pipeline called '%s', but you're trying to " \
"load it into '%s'" % (dump_meta["pipeline_name"], pipeline.name), file=sys.stderr)
sys.exit(1)
# Also check we've loaded the correct variant of the pipeline
# Would be nice to just load the right one automatically, but it's tricky to go back and load it again
# Instead, just output an error, so the user can load the right one
dump_variant = dump_meta.get("variant", "main")
if pipeline.variant != dump_variant:
print("Error: the dumped data came from the '%s' variant of the pipeline, " \
"but you're trying to load it into '%s'. Load the correct variant with '-v'" % \
(dump_variant, pipeline.variant), file=sys.stderr)
sys.exit(1)
try:
module = pipeline[module_name]
except KeyError:
print("Error: module with name '{}' does not exist in the pipeline".format(module_name), file=sys.stderr)
continue
# Get the output dir for the module
module_output_dir = module.get_module_output_dir(absolute=True)
print("Extracting module data to %s" % module_output_dir)
# If the output dir already exists, delete it first
if os.path.exists(module_output_dir):
if not force_overwrite:
# In this case, check we're happy to overwrite
sure = input("Module output dir already exists. Overwrite? [y/N] ")
if sure.lower() != "y":
print("Skipping %s" % module_name)
continue
shutil.rmtree(module_output_dir)
# Extract the directory into the module output dir's super directory (i.e. the pipeline dir)
extraction_dir = os.path.dirname(module_output_dir)
to_extract = [tarinfo for tarinfo in tarball.getmembers() if tarinfo.name.startswith("%s/" % module_name)]
tarball.extractall(path=extraction_dir, members=to_extract)
print("Data loaded")