Source code for pimlico.core.modules.inputs
# This file is part of Pimlico
# Copyright (C) 2016 Mark Granroth-Wilding
# Licensed under the GNU GPL v3.0 - http://www.gnu.org/licenses/gpl-3.0.en.html
"""
Base classes and utilities for input modules in a pipeline.
"""
import copy
from pimlico.core.config import PipelineStructureError
from pimlico.datatypes.base import IterableCorpus
from .base import BaseModuleInfo
from pimlico.core.modules.base import BaseModuleExecutor
[docs]class InputModuleInfo(BaseModuleInfo):
"""
Base class for input modules. These don't get executed in general, they just provide a way to iterate over
input data.
You probably don't want to subclass this. It's usually simplest to define a datatype for reading the input
data and then just specify its class as the module's type. This results in a subclass of this module info
being created dynamically to read that data.
Note that module_executable is typically set to False and the base class does this. However, some input
modules need to be executed before the input is usable, for example to collect stats about the input
data.
"""
module_type_name = "input"
module_executable = False
[docs] def instantiate_output_datatype(self, output_name, output_datatype, **kwargs):
raise NotImplementedError("input module type (%s) must implement its own datatype instantiator" %
self.module_type_name)
[docs]def input_module_factory(datatype):
"""
Create an input module class to load a given datatype.
"""
input_module_options = copy.copy(datatype.input_module_options)
if issubclass(datatype, IterableCorpus):
# Also get input options from the document type
input_module_options.update(datatype.data_point_type.input_module_options)
# Add a special option to allow a dir to be specified to read the data from
# This will become the base_dir for the datatype when instantiated
input_module_options["dir"] = {
"help": "Directory to read the data from. May be used to load a dataset from an output from another "
"Pimlico pipeline. If not given, the datatype's base dir will be the expected base dir within "
"this pipeline's data directory, which usually won't exist",
}
class DatatypeInputModuleInfo(InputModuleInfo):
module_type_name = "%s_input" % datatype.datatype_name
module_readable_name = "%s datatype input" % datatype.datatype_name
module_outputs = [("data", datatype)]
module_options = input_module_options
def __init__(self, module_name, pipeline, **kwargs):
super(DatatypeInputModuleInfo, self).__init__(module_name, pipeline, **kwargs)
self.override_base_dir = self.options["dir"]
def get_output_dir(self, output_name, short_term_store=False):
if self.override_base_dir is None:
if datatype.requires_data_preparation:
# During data preparation, this directly will be created and some data stored there
# The data is only ready once we pass data_ready() in the normal way
return super(DatatypeInputModuleInfo, self).get_output_dir(output_name,
short_term_store=short_term_store)
else:
# No data preparation required, which means that this input datatype never stores anything
# It therefore has a None base_dir, which causes the datatype to be satisfied without it,
# providing any further checks provided by its data_ready() are satisfied
return None
else:
return self.override_base_dir
def instantiate_output_datatype(self, output_name, output_datatype, **kwargs):
return output_datatype.create_from_options(self.get_output_dir(output_name), self.pipeline,
copy.deepcopy(self.options), module=self)
if datatype.requires_data_preparation:
# This module needs to be executed
class DataPreparationExecutor(BaseModuleExecutor):
def execute(self):
# Get the datatype instance
datatype_instance = self.info.get_output("data")
# Run the special data preparation method
datatype_instance.prepare_data(self.info.get_absolute_output_dir("data"), self.log)
DatatypeInputModuleInfo.module_executable = True
DatatypeInputModuleInfo.module_executor_override = DataPreparationExecutor
return DatatypeInputModuleInfo
[docs]class ReaderOutputType(IterableCorpus):
"""
A datatype for reading in input according to input module options and allowing it to
be iterated over by other modules.
Typically used together with `iterable_input_reader_factory()` as the output datatype.
``__len__`` should be overridden to take the processed input module options and return
the length of the corpus (number of documents).
``__iter__`` should use the processed input module options and return an iterator over the
corpus' documents (e.g. a generator function). Each item yielded should be a pair ``(doc_name, data)``
and ``data`` should be in the appropriate internal format associated with the document type.
``data_ready`` should be overridden to use the processed input module options and return True if the data
is ready to be read in.
In all cases, the input options are available as ``self.reader_options``.
"""
datatype_name = "reader_iterator"
#: Must be overridden by subclasses
data_point_type = None
#: Subclass information should be ignored for type checking. Should be treated exactly as an IterableCorpus
emulated_datatype = IterableCorpus
def __init__(self, reader_options, pipeline, **kwargs):
super(ReaderOutputType, self).__init__(None, pipeline, **kwargs)
self.reader_options = reader_options
[docs] def data_ready(self):
# Override to compute determine whether data is ready, using self.reader_options
raise NotImplementedError()
def __len__(self):
# Override to compute length using self.reader_options
raise NotImplementedError()
def __iter__(self):
# Override to iterate over documents using self.reader_options
raise NotImplementedError()
[docs]def iterable_input_reader_factory(input_module_options, output_type, module_type_name=None, module_readable_name=None):
"""
Factory for creating an input reader module type. This is a non-executable module that has no
inputs. It reads its data from some external location, using the given module options. The resulting
dataset is an IterableCorpus subtype, with the given document type.
``output_type`` is a datatype that performs the actual iteration over the data and is instantiated
with the processed options as its first argument. This is typically created by subclassing ReaderOutputType
and providing len, iter and data_ready methods.
**How is this different from ``input_module_factory``?** This method is used in your module code
to prepare a ModuleInfo class for reading a particular type of input data and presenting it as a
Pimlico dataset of the given type. ``input_module_factory``, on the other hand, is used by Pimlico
when you specify a datatype as a module type in a config file.
Note that, in future versions, reading datasets output by another Pimlico pipeline will be the only
purpose for that special notation. The possibility of specifying ``input_module_options`` to create
an input reader will disappear, so the use of ``input_module_options`` should be phased out and replaced
with input reader modules, such as those created by this factory.
"""
dp_type = output_type.data_point_type
mt_name = module_type_name or "reader_for_{}".format(dp_type.__name__)
mr_name = module_readable_name or "Input reader for {} iterable corpus".format(dp_type.__name__)
class IterableInputReaderModuleInfo(InputModuleInfo):
module_type_name = mt_name
module_readable_name = mr_name
module_outputs = [("corpus", output_type)]
module_options = input_module_options
def instantiate_output_datatype(self, output_name, output_datatype, **kwargs):
return output_type(self.options, self.pipeline)
return IterableInputReaderModuleInfo