Source code for pimlico.core.modules.inputs
# This file is part of Pimlico
# Copyright (C) 2016 Mark Granroth-Wilding
# Licensed under the GNU GPL v3.0 - http://www.gnu.org/licenses/gpl-3.0.en.html
"""
Base classes and utilities for input modules in a pipeline.
"""
import copy
from pimlico.core.config import PipelineStructureError
from pimlico.datatypes.base import IterableCorpus
from .base import BaseModuleInfo
from pimlico.core.modules.base import BaseModuleExecutor
[docs]class InputModuleInfo(BaseModuleInfo):
"""
Base class for input modules. These don't get executed in general, they just provide a way to iterate over
input data.
You probably don't want to subclass this. It's usually simplest to define a datatype for reading the input
data and then just specify its class as the module's type. This results in a subclass of this module info
being created dynamically to read that data.
Note that module_executable is typically set to False and the base class does this. However, some input
modules need to be executed before the input is usable, for example to collect stats about the input
data.
"""
module_type_name = "input"
module_executable = False
[docs] def instantiate_output_datatype(self, output_name, output_datatype):
raise NotImplementedError("input module type (%s) must implement its own datatype instantiator" %
self.module_type_name)
[docs]def input_module_factory(datatype):
"""
Create an input module class to load a given datatype.
"""
input_module_options = copy.copy(datatype.input_module_options)
if issubclass(datatype, IterableCorpus):
# Also get input options from the document type
input_module_options.update(datatype.data_point_type.input_module_options)
# Add a special option to allow a dir to be specified to read the data from
# This will become the base_dir for the datatype when instantiated
input_module_options["dir"] = {
"help": "Directory to read the data from. May be used to load a dataset from an output from another "
"Pimlico pipeline. If not given, the datatype's base dir will be the expected base dir within "
"this pipeline's data directory, which usually won't exist",
}
class DatatypeInputModuleInfo(InputModuleInfo):
module_type_name = "%s_input" % datatype.datatype_name
module_readable_name = "%s datatype input" % datatype.datatype_name
module_outputs = [("data", datatype)]
module_options = input_module_options
def __init__(self, module_name, pipeline, **kwargs):
super(DatatypeInputModuleInfo, self).__init__(module_name, pipeline, **kwargs)
self.override_base_dir = self.options["dir"]
def get_output_dir(self, output_name, short_term_store=False):
if self.override_base_dir is None:
return super(DatatypeInputModuleInfo, self).get_output_dir(output_name,
short_term_store=short_term_store)
else:
return self.override_base_dir
def instantiate_output_datatype(self, output_name, output_datatype):
return output_datatype.create_from_options(self.get_output_dir(output_name), self.pipeline,
copy.deepcopy(self.options))
if datatype.requires_data_preparation:
# This module needs to be executed
class DataPreparationExecutor(BaseModuleExecutor):
def execute(self):
# Get the datatype instance
datatype_instance = self.info.get_output("data")
# Run the special data preparation method
datatype_instance.prepare_data(self.info.get_absolute_output_dir("data"), self.log)
DatatypeInputModuleInfo.module_executable = True
DatatypeInputModuleInfo.module_executor_override = DataPreparationExecutor
return DatatypeInputModuleInfo