Source code for pimlico.cli.pimarc

# This file is part of Pimlico
# Copyright (C) 2020 Mark Granroth-Wilding
# Licensed under the GNU LGPL v3.0 - https://www.gnu.org/licenses/lgpl-3.0.en.html

from __future__ import print_function

import os
from tarfile import TarFile

from pimlico.cli.subcommands import PimlicoCLISubcommand
from pimlico.core.modules.base import satisfies_typecheck
from pimlico.datatypes import GroupedCorpus
from pimlico.datatypes.base import DataNotReadyError
from pimlico.utils.pimarc import PimarcWriter


[docs]class Tar2PimarcCmd(PimlicoCLISubcommand): """ Convert grouped corpora from the old tar-based storage format to Pimarc archives. """ command_name = "tar2pimarc" command_help = "Convert grouped corpora from the old tar-based storage format to pimarc"
[docs] def add_arguments(self, parser): parser.add_argument("outputs", nargs="*", help="Specification of module outputs to convert. Specific datasets can " "be given as 'module_name.output_name'. All grouped corpus outputs " "of a module can be converted by just giving 'module_name'. Or, if " "nothing's given, all outputs of all modules are converted") parser.add_argument("--run", action="store_true", help="Run conversion. Without this option, just checks " "what format the corpora use")
[docs] def run_command(self, pipeline, opts): run = opts.run if not run: print("DRY: Not running any conversions, just checking formats") output_specs = opts.outputs if output_specs is None or len(output_specs) == 0: # Nothing given: convert all modules outputs = [] for module_name in pipeline.module_order: # Check module for any grouped corpus outputs module = pipeline[module_name] grouped_outputs = [ name for name in module.output_names if satisfies_typecheck(module.get_output_datatype(name)[1], GroupedCorpus()) ] module_outputs = [(module_name, output) for output in grouped_outputs] if len(module_outputs): print("Including: {}".format(", ".join("{}.{}".format(mn, on) for (mn, on) in module_outputs))) outputs.extend(module_outputs) else: print("No grouped corpus outputs from {}".format(module_name)) else: outputs = [] for output_spec in output_specs: if "." in output_spec: module_name, __, output_name = output_spec.partition(".") module = pipeline[module_name] # Check this output is a grouped corpus if not satisfies_typecheck(module.get_output_datatype(output_name)[1], GroupedCorpus()): print("Skipping {}: not a grouped corpus".format(output_spec)) else: outputs.append((module_name, output_name)) print("Including: {}.{}".format(module_name, output_name)) else: # Just module name: add all outputs that are grouped corpora module_name = output_spec module = pipeline[module_name] grouped_outputs = [ name for name in module.output_names if satisfies_typecheck(module.get_output_datatype(name)[1], GroupedCorpus()) ] module_outputs = [(module_name, output) for output in grouped_outputs] if len(module_outputs): print("Including: {}".format(", ".join("{}.{}".format(mn, on) for (mn, on) in module_outputs))) outputs.extend(module_outputs) else: print("No grouped corpus outputs from {}".format(module_name)) if len(outputs) == 0: print("No corpora to convert") for module_name, output_name in outputs: module = pipeline[module_name] try: corpus = module.get_output(output_name) except DataNotReadyError: print("Skipping {}.{} as data is not ready to read".format(module_name, output_name)) else: if not isinstance(corpus, GroupedCorpus.Reader): # This corpus uses a reader other than the standard grouped corpus reader # This probably means it's produced on the fly, or stored some other way # We therefore can't do any kind of conversion print("Skipping {}.{} which reads its data using {}".format(module_name, output_name, type(corpus))) elif corpus.uses_tar: # Check the format of the stored data # This is an old tar-based corpus # Look for all the tar files tar_paths = [ os.path.join(corpus.data_dir, fn) for fn in corpus.archive_filenames ] if run: print("Converting tar files in {}".format(corpus.data_dir)) tar_to_pimarc(tar_paths) # Remove the tar files for tp in tar_paths: os.remove(tp) else: print("Would convert {}.{} from tar to prc".format(module_name, output_name)) for tp in tar_paths: print(" {}".format(tp)) else: print("Already stored using prc: {}.{}".format(module_name, output_name))
[docs]def tar_to_pimarc(in_tar_paths): for tar_path in in_tar_paths: tar_path = os.path.abspath(tar_path) # Work out where to put the converted file out_filename = "{}.prc".format(os.path.splitext(os.path.basename(tar_path))[0]) out_path = os.path.join(os.path.dirname(tar_path), out_filename) # Create a writer to add files to with PimarcWriter(out_path) as arc: # Read in the tar file tarfile = TarFile.open(tar_path, "r:") for tarinfo in tarfile: name = tarinfo.name with tarfile.extractfile(tarinfo) as tar_member: data = tar_member.read() arc.write_file(data, name)