# This file is part of Pimlico
# Copyright (C) 2016 Mark Granroth-Wilding
# Licensed under the GNU GPL v3.0 - http://www.gnu.org/licenses/gpl-3.0.en.html
"""
Datatypes provide interfaces for reading (and in some cases writing) datasets. At their most basic,
they define a way to iterate over a dataset linearly. Some datatypes may also provide other functionality,
such as random access or compression.
As much as possible, Pimlico pipelines should use standard datatypes to connect up the output of modules
with the input of others. Most datatypes have a lot in common, which should be reflected in their sharing
common base classes. Custom datatypes will be needed for most datasets when they're used as inputs, but
as far as possible, these should be converted into standard datatypes like
:class:`~pimlico.datatypes.tar.TarredCorpus`, early in the pipeline.
.. note::
The following classes were moved to :mod:`~pimlico.datatypes.core` in version 0.6rc
"""
import cPickle as pickle
import os
import re
import json
from traceback import format_exc
from pimlico.cli.shell.base import ShellCommand
from pimlico.datatypes.documents import RawDocumentType
from pimlico.utils.core import import_member
from pimlico.utils.progress import get_progress_bar
__all__ = [
"PimlicoDatatype", "PimlicoDatatypeWriter", "IterableCorpus", "IterableCorpusWriter",
"DynamicOutputDatatype", "DynamicInputDatatypeRequirement",
"InvalidDocument",
"DatatypeLoadError", "DatatypeWriteError",
"load_datatype", "MultipleInputs",
]
_class_name_word_boundary = re.compile(r"([a-z])([A-Z])")
[docs]class PimlicoDatatype(object):
"""
The abstract superclass of all datatypes. Provides basic functionality for identifying where
data should be stored and such.
Datatypes are used to specify the routines for reading the output from modules. They're also
used to specify how to read pipeline inputs. Most datatypes that have data simply read it in
when required. Some (in particular those used as inputs) need a preparation phase to be run,
where the raw data itself isn't sufficient to implement the reading interfaces required. In this
case, they should override prepare_data().
Datatypes may require/allow options to be set when they're used to read pipeline inputs. These
are specified, in the same way as module options, by input_module_options on the datatype class.
Datatypes may supply a set of additional datatypes. These should be guaranteed to be available if the
main datatype is available. They must require no extra processing to be made available, unless that
is done on the fly while reading the datatype (like a filter) or while the main datatype is being
written.
Additional datatypes can be accessed in config files by specifying the main datatype (as a previous module,
optionally with an output name) and the additional datatype name in the form `main_datatype->additional_name`.
Multiple additional names may be given, causing the next name to be looked up as an additional
datatype of the initially loaded additional datatype. E..g `main_datatype->additional0->additional1`.
To avoid conflicts in the metadata between datatypes using the same directory, datatypes loaded as additional
datatypes have their additional name available to them and use it as a prefix to the metadata filename.
If `use_main_metadata=True` on an additional datatype, the same metadata will be read as for the main
datatype to which this is an additional datatype.
`module` is the ModuleInfo instance for the pipeline module that this datatype was produced by. It may
be None, if the datatype wasn't instantiated by a module. It is not required to be set if you're
instantiating a datatype in some context other than module output. It should generally be set for
input datatypes, though, since they are treated as being created by a special input module.
"""
datatype_name = "base_datatype"
requires_data_preparation = False
input_module_options = {}
"""
Override to provide shell commands specific to this datatype. Should include the superclass' list.
"""
shell_commands = []
"""
List of additional datatypes provided by this one, given as (name, datatype class) pairs.
For each of these, a call to `get_additional_datatype(name)` (once the main datatype is ready) should return a
datatype instance that is also ready.
"""
supplied_additional = []
"""
Most datatype classes define their own type of corpus, which is often a subtype of some other. Some, however,
emulate another type and it is that type that should be considered the be the type of the dataset, not the
class itself.
For example, TarredCorpusFilter dynamically produces something that looks like a TarredCorpus,
and further down the pipeline, if its type is need, it should be considered to be a TarredCorpus.
Most of the time, this can be left empty, but occasionally it needs to be set.
"""
emulated_datatype = None
def __init__(self, base_dir, pipeline, module=None, additional_name=None, use_main_metadata=False, **kwargs):
self.use_main_metadata = use_main_metadata
self.additional_name = additional_name
self.pipeline = pipeline
self.base_dir = base_dir
# Search for an absolute path to the base dir that exists
self.absolute_base_dir = pipeline.find_data_path(base_dir) if self.base_dir is not None else None
self.data_dir = os.path.join(self.absolute_base_dir, "data") if self.absolute_base_dir is not None else None
self.module = module
self._metadata_filename = None
self.options = kwargs
# Set default options from the input module options
# If loading as an input datatype, these will already have been set, but otherwise they might not be present
for opt_name, opt_dict in self.input_module_options.iteritems():
if opt_name not in self.options:
self.options[opt_name] = opt_dict.get("default", None)
# If the overriding class doesn't set datatype_name, we should default to something sensible
if self.datatype_name == "base_datatype" and type(self) is not PimlicoDatatype:
# Build a better name out of the class name
self.datatype_name = _class_name_word_boundary.sub(r"\1_\2", type(self).__name__).lower()
def _get_metadata(self):
"""
Read in metadata from a file in the corpus directory.
Note that this is no longer cached in memory. We need to be sure that the metadata values returned are
always up to date with what is on disk, so always re-read the file when we need to get a value from
the metadata. Since the file is typically small, this is unlikely to cause a problem. If we decide to
return to cacheing the metadata dictionary in future, we will need to make sure that we can never run into
problems with out-of-date metadata being returned.
"""
if self._metadata_filename is None and self.absolute_base_dir is not None:
# Not previously been able to access metadata, but corpus dir appears to be available now
if self.additional_name is None or self.use_main_metadata:
metadata_filename = "corpus_metadata"
else:
metadata_filename = "%s_corpus_metadata" % self.additional_name
self._metadata_filename = os.path.join(self.absolute_base_dir, metadata_filename)
if self._metadata_filename is not None and os.path.exists(self._metadata_filename):
# Load dictionary of metadata
with open(self._metadata_filename, "r") as f:
raw_data = f.read()
if len(raw_data) == 0:
# Empty metadata file: return empty metadata no matter what
return {}
try:
# In later versions of Pimlico, we store metadata as JSON, so that it can be read in the file
return json.loads(raw_data)
except ValueError:
# If the metadata was written by an earlier Pimlico version, we fall back to the old system:
# it's a pickled dictionary
return pickle.loads(raw_data)
else:
# No metadata written: data may not have been written yet
return {}
metadata = property(_get_metadata)
[docs] def get_required_paths(self):
"""
Returns a list of paths to files that should be available for the data to be read.
The base data_ready() implementation checks that these are all available and, if the datatype
is used as an input to a pipeline and requires a data preparation routine to be run, data
preparation will not be executed until these files are available.
Paths may be absolute or relative. If relative, they refer to files within the data directory
and data_ready() will fail if the data dir doesn't exist.
:return: list of absolute or relative paths
"""
return []
[docs] def get_software_dependencies(self):
"""
Check that all software required to read this datatype is installed and locatable. This is
separate to metadata config checks, so that you don't need to satisfy the dependencies for
all modules in order to be able to run one of them. You might, for example, want to run different
modules on different machines. This is called when a module is about to be executed and each of the
dependencies is checked.
Returns a list of instances of subclasses of :class:~pimlico.core.dependencies.base.SoftwareDependency,
representing the libraries that this module depends on.
Take care when providing dependency classes that you don't put any import statements at the top of the Python
module that will make loading the dependency type itself dependent on runtime dependencies.
You'll want to run import checks by putting import statements within this method.
You should call the super method for checking superclass dependencies.
"""
return []
[docs] def prepare_data(self, output_dir, log):
return
[docs] @classmethod
def create_from_options(cls, base_dir, pipeline, options={}, module=None):
return cls(base_dir, pipeline, module=module, **options)
[docs] def data_ready(self):
"""
Check whether the data corresponding to this datatype instance exists and is ready to be read.
Default implementation just checks whether the data dir exists. Subclasses might want to add their own
checks, or even override this, if the data dir isn't needed.
"""
# Get a list of files that the datatype depends on
paths = self.get_required_paths()
if paths:
for path in paths:
if os.path.abspath(path):
# Simply check whether the file exists
if not os.path.exists(path):
return False
else:
# Relative path: requires that data_dir exists
if self.data_dir is None:
return False
elif not os.path.exists(os.path.join(self.data_dir, path)):
# Data dir is ready, but the file within it doesn't exist
return False
# If base_dir is None, the datatype doesn't need it
# data_dir is None unless the data dir has been located
return self.base_dir is None or self.data_dir is not None
[docs] def get_detailed_status(self):
"""
Returns a list of strings, containing detailed information about the data.
Only called if data_ready() == True.
Subclasses may override this to supply useful (human-readable) information specific to the datatype.
They should called the super method.
"""
return []
[docs] @classmethod
def datatype_full_class_name(cls):
"""
The fully qualified name of the class for this datatype, by which it is reference in config files.
Generally, datatypes don't need to override this, but type requirements that take the place of datatypes
for type checking need to provide it.
"""
return "%s.%s" % (cls.__module__, cls.__name__)
[docs] def instantiate_additional_datatype(self, name, additional_name):
"""
Default implementation just assumes the datatype class can be instantiated using the default constructor,
with the same base dir and pipeline as the main datatype. Options given to the main datatype are passed
down to the additional datatype.
"""
return dict(self.supplied_additional)[name](
self.base_dir, self.pipeline, additional_name=additional_name, module=self.module, **self.options)
[docs] @classmethod
def check_type(cls, supplied_type):
"""
Method used by datatype type-checking algorithm to determine whether a supplied datatype (given as a
class, which is a subclass of PimlicoDatatype) is compatible with the present datatype, which is being
treated as a type requirement.
Typically, the present class is a type requirement on a module input and `supplied_type` is the type provided
by a previous module's output.
The default implementation simply checks whether `supplied_type` is a subclass of the present class. Subclasses
may wish to impose different or additional checks.
:param supplied_type: type provided where the present class is required, or datatype instance
:return: True if the check is successful, False otherwise
"""
if supplied_type.emulated_datatype is not None:
return cls.check_type(supplied_type.emulated_datatype)
if isinstance(supplied_type, type):
return issubclass(supplied_type, cls)
else:
return isinstance(supplied_type, cls)
[docs] @classmethod
def type_checking_name(cls):
"""
Supplies a name for this datatype to be used in type-checking error messages. Default implementation
just provides the class name. Classes that override check_supplied_type() may want to override this too.
"""
return cls.__name__
[docs] @classmethod
def full_datatype_name(cls):
"""
Returns a string/unicode name for the datatype that includes relevant sub-type information. The
default implementation just uses the attribute `datatype_name`, but subclasses may have more
detailed information to add. For example, iterable corpus types also supply information about the
data-point type.
"""
return cls.datatype_name
[docs]class DynamicOutputDatatype(object):
"""
Types of module outputs may be specified as a subclass of :class:`.PimlicoDatatype`, or alternatively
as an *instance* of DynamicOutputType. In this case, get_datatype() is called when the output datatype is
needed, passing in the module info instance for the module, so that a specialized datatype can be
produced on the basis of options, input types, etc.
The dynamic type must provide certain pieces of information needed for typechecking.
"""
"""
Must be provided by subclasses: can be a noncommittal string giving some idea of what types may be provided.
Used for documentation.
"""
datatype_name = None
[docs] def get_datatype(self, module_info):
raise NotImplementedError
[docs] def get_base_datatype_class(self):
"""
If it's possible to say before the instance of a ModuleInfo is available what base datatype will be
produced, implement this to return the class. By default, it returns None.
If this information is available, it will be used in documentation.
"""
return None
[docs]class PimlicoDatatypeWriter(object):
"""
Abstract base class fo data writer associated with Pimlico datatypes.
"""
def __init__(self, base_dir, additional_name=None):
self.additional_name = additional_name
self.base_dir = base_dir
self.data_dir = os.path.join(self.base_dir, "data")
self.metadata = {}
# Make sure the necessary directories exist
if not os.path.exists(self.data_dir):
os.makedirs(self.data_dir)
# Stores a set of output tasks that must be completed before the exit routine is called
# Subclasses can add things to this in their init and remove them as the tasks are performed
# The superclass exit will check that the set is empty
self._to_output = set()
[docs] def require_tasks(self, *tasks):
"""
Add a name or multiple names to the list of output tasks that must be completed before writing is finished
"""
self._to_output.update(tasks)
[docs] def task_complete(self, task):
if task in self._to_output:
self._to_output.remove(task)
@property
def incomplete_tasks(self):
return list(self._to_output)
def __enter__(self):
# Store an initial version of the metadata
self.write_metadata()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.write_metadata()
# Check all required output tasks were completed
if len(self._to_output):
raise DatatypeWriteError("some outputs were not written for datatype %s: %s" %
(type(self).__name__, ", ".join(self._to_output)))
[docs] def subordinate_additional_name(self, name):
if self.additional_name is not None:
return "%s->%s" % (self.additional_name, name)
else:
return name
class CountInvalidCmd(ShellCommand):
"""
Data shell command to count up the number of invalid docs in a tarred corpus. Applies to any iterable corpus.
"""
commands = ["invalid"]
help_text = "Count the number of invalid documents in this dataset"
def execute(self, shell, *args, **kwargs):
corpus = shell.data
pbar = get_progress_bar(len(corpus), title="Counting")
invalids = sum(
(1 if isinstance(doc, InvalidDocument) else 0) for __, doc in pbar(corpus)
)
print "%d / %d documents are invalid" % (invalids, len(corpus))
[docs]class IterableCorpus(PimlicoDatatype):
"""
Superclass of all datatypes which represent a dataset that can be iterated over document by document
(or datapoint by datapoint - what exactly we're iterating over may vary, though documents are most common).
The actual type of the data depends on the subclass: it could be, e.g. coref output, etc. Information about
the type of individual documents is provided by `document_type` and this is used in type checking.
At creation time, length should be provided in the metadata, denoting how many documents are in the dataset.
"""
datatype_name = "iterable_corpus"
data_point_type = RawDocumentType
shell_commands = PimlicoDatatype.shell_commands + [CountInvalidCmd()]
def __init__(self, *args, **kwargs):
self.raw_data = kwargs.pop("raw_data", False)
self.as_type = kwargs.pop("as_type", None)
super(IterableCorpus, self).__init__(*args, **kwargs)
# Prepare the document datatype instance
# Pass in all the options/kwargs we've got, which include any options that the document type specifies
self.data_point_type_instance = self.data_point_type(self.options, self.metadata)
def __iter__(self):
"""
Subclasses should implement an iter method that simply iterates over all the documents in the
corpus in a consistent order. They may also provide other methods for iterating over or otherwise
accessing the data.
Each yielded document should consist of a pair `(name, doc)`, where `name` is an identifier for the document
(e.g. filename) and `doc` is the document's data, in the appropriate type.
You may wish to call `process_document_data_with_datatype()` on the raw content to apply the data-point
type's standard processing to it.
"""
raise NotImplementedError
def __len__(self):
try:
return self.metadata["length"]
except KeyError:
raise DatatypeLoadError("no length found in metadata for %s corpus. It is an iterable corpus, so if it "
"is ready to use, the length should have been stored. Metadata keys found: %s" %
(self.datatype_name, self.metadata.keys()))
[docs] def get_detailed_status(self):
return super(IterableCorpus, self).get_detailed_status() + [
"Length: %d" % len(self)
]
[docs] @classmethod
def check_type(cls, supplied_type):
"""
Override type checking to require that the supplied type have a document type that is compatible with
(i.e. a subclass of) the document type of this class.
"""
if isinstance(supplied_type, type):
main_type_check = issubclass(supplied_type, IterableCorpus)
else:
main_type_check = isinstance(supplied_type, IterableCorpus)
return main_type_check and issubclass(supplied_type.data_point_type, cls.data_point_type)
[docs] @classmethod
def type_checking_name(cls):
return "%s<%s>" % (cls.__name__, cls.data_point_type.__name__)
[docs] @classmethod
def full_datatype_name(cls):
return "%s<%s>" % (cls.datatype_name, cls.data_point_type.__name__)
[docs] def process_document_data_with_datatype(self, data):
"""
Applies the corpus' datatype's process_document() method to the raw data
:param data:
:return:
"""
# Catch invalid documents
document = InvalidDocument.invalid_document_or_text(data)
# Apply subclass-specific post-processing if we've not been asked to yield just the raw data
if type(document) is not InvalidDocument and not self.raw_data:
try:
document = self.data_point_type_instance.process_document(document)
except BaseException, e:
# If there's any problem reading in the document, yield an invalid doc with the error
document = InvalidDocument("datatype %s reader" % self.datatype_name, "%s: %s" % (e, format_exc()))
return document
def iterable_corpus_with_data_point_type(data_point_type):
"""
Dynamically subclass IterableCorpus to provide a version with a given data-point type.
Most of the time, static subclasses are provided and set their data-point type appropriately, but occasionally
for type-checking purposes it can be useful to construct a new, specialized iterable corpus on the fly.
"""
return type(
"%sIterableCorpus" % data_point_type.__name__,
(IterableCorpus,),
dict(data_point_type=data_point_type),
)
[docs]class IterableCorpusWriter(PimlicoDatatypeWriter):
def __exit__(self, exc_type, exc_val, exc_tb):
super(IterableCorpusWriter, self).__exit__(exc_type, exc_val, exc_tb)
# Check the length has been set
if "length" not in self.metadata:
raise DatatypeWriteError("writer for IterableDocumentCorpus must set a 'length' value in the metadata")
[docs]class InvalidDocument(object):
"""
Widely used in Pimlico to represent an empty document that is empty not because the original input document
was empty, but because a module along the way had an error processing it. Document readers/writers should
generally be robust to this and simply pass through the whole thing where possible, so that it's always
possible to work out, where one of these pops up, where the error occurred.
"""
def __init__(self, module_name, error_info=None):
self.error_info = error_info
self.module_name = module_name
def __unicode__(self):
return u"***** EMPTY DOCUMENT *****\nEmpty due to processing error in module: %s\n\nFull error details:\n%s" % \
(self.module_name, self.error_info or "")
def __str__(self):
return unicode(self).encode("ascii", "ignore")
[docs] @staticmethod
def load(text):
if not text.startswith("***** EMPTY DOCUMENT *****"):
raise ValueError("tried to read empty document text from invalid text: %s" % text)
text = text.partition("\n")[2]
module_line, __, text = text.partition("\n\n")
module_name = module_line.partition(": ")[2]
error_info = text.partition("\n")[2]
return InvalidDocument(module_name, error_info)
[docs] @staticmethod
def invalid_document_or_text(text):
"""
If the text represents an invalid document, parse it and return an InvalidDocument object.
Otherwise, return the text as is.
"""
if isinstance(text, InvalidDocument):
return text
elif text.startswith("***** EMPTY DOCUMENT *****"):
return InvalidDocument.load(text)
else:
return text
class TypeFromInput(DynamicOutputDatatype):
"""
Infer output type from the type of an input. Passes the type through exactly, except where the input
datatype provides an `emulated_datatype`.
Input name may be given. Otherwise, the default input is used.
"""
datatype_name = "same as input corpus"
def __init__(self, input_name=None):
self.input_name = input_name
def get_datatype(self, module_info):
datatype = module_info.get_input_datatype(self.input_name)
# If the input datatype emulates another, it is that other that we will produce as output
if datatype.emulated_datatype is not None:
datatype = datatype.emulated_datatype
return datatype
[docs]class DatatypeLoadError(Exception):
pass
[docs]class DatatypeWriteError(Exception):
pass
[docs]def load_datatype(path):
"""
Try loading a datatype class for a given path. Raises a DatatypeLoadError if it's not a valid
datatype path.
"""
try:
cls = import_member(path)
except ImportError, e:
raise DatatypeLoadError("could not load datatype class %s: %s" % (path, e))
if type(cls) is not type(object):
raise DatatypeLoadError("tried to load datatype %s, but result was not a class, it was a %s" %
(path, type(cls).__name__))
if not issubclass(cls, PimlicoDatatype):
raise DatatypeLoadError("%s is not a Pimlico datatype" % path)
return cls