# This file is part of Pimlico
# Copyright (C) 2016 Mark Granroth-Wilding
# Licensed under the GNU GPL v3.0 - http://www.gnu.org/licenses/gpl-3.0.en.html
from Queue import Empty
from time import sleep
from traceback import print_exc, format_exc
from pimlico.core.config import PipelineStructureError
from pimlico.core.modules.base import load_module_executor, BaseModuleInfo
from pimlico.core.modules.execute import ModuleExecutionError
from pimlico.core.modules.map import InputQueueFeeder, DocumentMapModuleInfo
from pimlico.datatypes.base import InvalidDocument
from pimlico.datatypes.tar import TarredCorpus, AlignedTarredCorpora
from pimlico.utils.pipes import qget
[docs]class DocumentMapOutputTypeWrapper(object):
# Tells you the datatype that this simulates, for typechecking
non_filter_datatype = None
wrapped_module_info = None
output_name = None
def __init__(self, *args, **kwargs):
raw_data = kwargs.pop("raw_data", False)
# We should be a subclass of this datatype, so call its constructor
self.non_filter_datatype.__init__(self, *args, **kwargs)
self.raw_data = raw_data
self.output_num = [name for name, type in self.wrapped_module_info.available_outputs].index(self.output_name)
self.multiple_outputs = len(self.wrapped_module_info.available_outputs) > 1
self._input_iterator = None
# Get hold of the outputs from the previous modules to iterate over them
self.input_corpora = [self.wrapped_module_info.get_input(input_name)
for input_name in self.wrapped_module_info.input_names]
self.input_iterator = AlignedTarredCorpora(self.input_corpora)
def __len__(self):
# Delegate to input datatypes
return len(self.input_iterator)
def __iter__(self):
for __, doc_name, doc in self.archive_iter():
yield doc_name, doc
[docs] def archive_iter(self, subsample=None, start_after=None):
"""
Provides an iterator just like TarredCorpus, but instead of iterating over data read from disk,
gets it on the fly from the input datatype.
"""
# To make sure we're ready to iterate over the input data and have all the metadata we need, we must
# actually create the output writer now
output_dir = self.wrapped_module_info.get_absolute_output_dir(self.output_name)
# Get hold of the outputs from the previous modules to iterate over them
input_iterator = self.input_iterator
# Load an executor for the module we're wrapping, so we can use some of its functionality
executor_cls = self.wrapped_module_info.load_executor()
executor = executor_cls(self.wrapped_module_info)
# Call the set-up routine, if one's been defined
executor.log.info(
"Preparing document map execution for filter module %s" % self.wrapped_module_info.module_name
)
executor.preprocess()
# Start up a processing pool, but only ever use a single process
executor.pool = executor.create_pool(1)
complete = False
invalid_inputs = 0
invalid_outputs = 0
input_feeder = None
try:
# Prepare a corpus writer for the output
with self.wrapped_module_info.get_writer(self.output_name, output_dir) as writer:
# Now that we've created the writer, we should be able to initialize a corresponding reader
# Of course, we can't read any documents from it, but we can use its postprocessing function
dummy_reader = self.wrapped_module_info.get_output(self.output_name)
# Set a thread going to feed things onto the input queue
input_feeder = InputQueueFeeder(
executor.pool.input_queue,
input_iterator.archive_iter(subsample=subsample, start_after=start_after),
complete_callback=executor.pool.notify_no_more_inputs
)
# Wait to make sure the input feeder's fed something into the input queue
input_feeder.started.wait()
# Check what document we're looking for next
next_document = input_feeder.get_next_output_document()
while next_document is not None:
# Wait for a document coming off the output queue
while True:
try:
# Wait a little bit to see if there's a result available
result = qget(executor.pool.output_queue, timeout=0.2)
except Empty:
# Timed out: check there's not been an error in one of the processes
try:
error = executor.pool.exception_queue.get_nowait()
except Empty:
# No error: just keep waiting
pass
else:
# Got an error from a process: raise it
# First empty the exception queue, in case there were multiple errors
sleep(0.05)
while not executor.pool.exception_queue.empty():
qget(executor.pool.exception_queue, timeout=0.1)
# Sometimes, a traceback from within the process is included
if hasattr(error, "debugging_info"):
# We've already attached debugging info at some lower level: just use it
debugging = error.debugging_info
elif hasattr(error, "traceback"):
debugging = error.traceback
else:
debugging = None
raise ModuleExecutionError("error in worker process: %s" % error,
cause=error, debugging_info=debugging)
input_feeder.check_for_error()
except:
raise
else:
# Got a result from the process
break
# We've got a result and, since we're only using 1 process, it must be the right one: check
if (result.archive, result.filename) != next_document:
raise ModuleExecutionError(
"something went wrong with filter execution. Expected result for %s/%s, but got %s/%s"
% (result.archive, result.filename, next_document[0], next_document[1])
)
# Next document processed: yield the result
if type(result) is InvalidDocument or \
(self.multiple_outputs and type(result.data[self.output_num]) is InvalidDocument) or \
(not self.multiple_outputs and type(result.data) is InvalidDocument):
invalid_outputs += 1
# Check whether the document was also invalid in the input
if input_feeder.check_invalid(result.archive, result.filename):
invalid_inputs += 1
# Just got a single invalid document out
yield result.archive, result.filename, result.data
else:
# Here the normal executor would write the outputs to disk
# Instead we simply yield the one we're interested in
# Use the writer to convert it from what it expects from the processing to raw text
if self.multiple_outputs:
data = result.data[self.output_num]
else:
data = result.data
# If we're outputting raw data, use the writer to convert the data structure to raw data
if self.raw_data:
data = writer.document_to_raw_data(data)
# Historical note:
# Previously, we did a little process here where we converted the doc to raw data and then
# back into the data structure required. This was to allow for writers that aren't symmetrical:
# they expect datatype A, convert it to raw data and write it, but the corresponding reader
# reads the raw data into datatype B. This is a stupid thing to do most of the time. It's
# also stupid to slow down the execution of almost every pipeline to allow for this niche
# case, so I've stopped doing it
yield result.archive, result.filename, data
# Check what document we're waiting for now
next_document = input_feeder.get_next_output_document()
# We get a None next_document if there's an error in the input feeder at the beginning
# Check whether this has happened
input_feeder.check_for_error()
complete = True
except Exception, e:
# Any other uncaught exception should be passed up as a ModuleExecutionError, since we're actually
# executing a module here, even though we're pretending to iterate over data
# This causes the restart procedure to catch the error just as if something went wrong in map execution
if hasattr(e, "debugging_info"):
# We've already attached debugging info at some lower level: just use it
debugging = e.debugging_info
elif hasattr(e, "traceback"):
debugging = e.traceback
else:
# Just include the stack trace from this process
debugging = format_exc()
raise ModuleExecutionError("error in filter %s: %s" % (self.wrapped_module_info.module_name, e),
cause=e, debugging_info=debugging)
finally:
# Call the finishing-off routine, if one's been defined
executor.postprocess(error=not complete)
executor.log.info("[%s filter] Input contained %d invalid documents, output contained %d" %
(self.wrapped_module_info.module_name, invalid_inputs, invalid_outputs))
if input_feeder is not None:
input_feeder.check_for_error()
input_feeder.shutdown()
# Also shut down the executor's worker pool
executor.pool.shutdown()
[docs] def data_ready(self):
"""
Ready to supply this data as soon as all the wrapper module's inputs are ready to produce their data.
"""
return all(input_type.data_ready()
for input_name in self.wrapped_module_info.input_names
for input_type in self.wrapped_module_info.get_input(input_name, always_list=True))
def _wrap_output(module_info_instance, inner_output_name):
__, output_datatype = module_info_instance.get_output_datatype(inner_output_name)
if not issubclass(output_datatype, TarredCorpus):
# Can only wrap TarredCorpus outputs of a document map
raise PipelineStructureError("problem treating module '%s' as a filter. Tried to wrap output '%s' with a "
"datatype that produces the output on the fly, but it's not a subclass of "
"TarredCorpus" % (module_info_instance.module_name, inner_output_name))
# Create a special subclass of the general output wrapper for this output
# Doing so using type() instead of a class def allows us to give it an informative class name
wrapper_cls = type(
"%sFilterWrapper" % output_datatype.__name__,
(DocumentMapOutputTypeWrapper, output_datatype),
dict(
non_filter_datatype=output_datatype,
wrapped_module_info=module_info_instance,
output_name=inner_output_name,
)
)
return wrapper_cls
[docs]def wrap_module_info_as_filter(module_info_instance):
"""
Create a filter module from a document map module so that it gets executed on the fly to provide its
outputs as input to later modules. Can be applied to any document map module simply by adding `filter=T`
to its config.
This function is called when `filter=T` is given.
:param module_info_instance: basic module info to wrap the outputs of
:return: a new non-executable ModuleInfo whose outputs are produced on the fly and will be identical to
the outputs of the wrapper module.
"""
# Check that this is a document map module: otherwise it doesn't make sense to wrap it
if not isinstance(module_info_instance, DocumentMapModuleInfo):
raise PipelineStructureError("cannot create a filter from a %s module, as it's not a document map module "
"(tried to run module '%s' as a filter)" %
(module_info_instance.module_type_name, module_info_instance.module_name))
# Wrap each of the output datatypes so that it executes the document processing on the fly
wrapped_outputs = []
for output_name in module_info_instance.output_names:
wrapped_outputs.append((output_name, _wrap_output(module_info_instance, output_name)))
class ModuleInfo(BaseModuleInfo):
module_type_name = "%s_filter" % module_info_instance.module_type_name
module_options = []
module_inputs = module_info_instance.module_inputs
module_outputs = wrapped_outputs
module_optional_outputs = []
module_executable = False
info = ModuleInfo(
module_info_instance.module_name,
module_info_instance.pipeline,
inputs=module_info_instance.inputs,
options={},
optional_outputs=[]
)
# Pass through module variables
info.module_variables = module_info_instance.module_variables
return info