# This file is part of Pimlico
# Copyright (C) 2016 Mark Granroth-Wilding
# Licensed under the GNU GPL v3.0 - http://www.gnu.org/licenses/gpl-3.0.en.html
from Queue import Empty
from time import sleep
from pimlico.core.config import PipelineStructureError
from pimlico.core.modules.base import load_module_executor, BaseModuleInfo
from pimlico.core.modules.execute import ModuleExecutionError
from pimlico.core.modules.map import InputQueueFeeder
from pimlico.datatypes.base import InvalidDocument
from pimlico.datatypes.tar import TarredCorpus, AlignedTarredCorpora
from pimlico.utils.pipes import qget
[docs]class DocumentMapOutputTypeWrapper(object):
# Tells you the datatype that this simulates, for typechecking
non_filter_datatype = None
wrapped_module_info = None
output_name = None
def __init__(self, *args, **kwargs):
raw_data = kwargs.pop("raw_data", False)
# We should be a subclass of this datatype, so call its constructor
self.non_filter_datatype.__init__(self, *args, **kwargs)
self.raw_data = raw_data
self.output_num = [name for name, type in self.wrapped_module_info.available_outputs].index(self.output_name)
self._input_iterator = None
# Get hold of the outputs from the previous modules to iterate over them
self.input_corpora = [self.wrapped_module_info.get_input(input_name)
for input_name in self.wrapped_module_info.input_names]
self.input_iterator = AlignedTarredCorpora(self.input_corpora)
def __len__(self):
# Delegate to input datatypes
return len(self.input_iterator)
def __iter__(self):
for __, doc_name, doc in self.archive_iter():
yield doc_name, doc
[docs] def archive_iter(self, subsample=None, start_after=None):
"""
Provides an iterator just like TarredCorpus, but instead of iterating over data read from disk,
gets it on the fly from the input datatype.
"""
# To make sure we're ready to iterate over the input data and have all the metadata we need, we must
# actually create the output writer now
output_dir = self.wrapped_module_info.get_absolute_output_dir(self.output_name)
# Get hold of the outputs from the previous modules to iterate over them
input_iterator = self.input_iterator
# Load an executor for the module we're wrapping, so we can use some of its functionality
executor_cls = load_module_executor(self.wrapped_module_info)
executor = executor_cls(self.wrapped_module_info)
# Call the set-up routine, if one's been defined
executor.log.info(
"Preparing document map execution for filter module %s" % self.wrapped_module_info.module_name
)
executor.preprocess()
# Start up a processing pool, but only ever use a single process
executor.pool = executor.create_pool(1)
complete = False
invalid_inputs = 0
invalid_outputs = 0
try:
# Prepare a corpus writer for the output
with self.wrapped_module_info.get_writer(self.output_name, output_dir) as writer:
# Now that we've created the writer, we should be able to initialize a corresponding reader
# Of course, we can't read any documents from it, but we can use its postprocessing function
dummy_reader = self.wrapped_module_info.get_output(self.output_name)
# Set a thread going to feed things onto the input queue
input_feeder = InputQueueFeeder(
executor.pool.input_queue,
input_iterator.archive_iter(subsample=subsample, start_after=start_after),
complete_callback=executor.pool.notify_no_more_inputs
)
# Wait to make sure the input feeder's fed something into the input queue
input_feeder.started.wait()
# Check what document we're looking for next
next_document = input_feeder.get_next_output_document()
while next_document is not None:
# Wait for a document coming off the output queue
while True:
try:
# Wait a little bit to see if there's a result available
result = qget(executor.pool.output_queue, timeout=0.2)
except Empty:
# Timed out: check there's not been an error in one of the processes
try:
error = executor.pool.exception_queue.get_nowait()
except Empty:
# No error: just keep waiting
pass
else:
# Got an error from a process: raise it
# First empty the exception queue, in case there were multiple errors
sleep(0.05)
while not executor.pool.exception_queue.empty():
qget(executor.pool.exception_queue, timeout=0.1)
# Sometimes, a traceback from within the process is included
if hasattr(error, "debugging_info"):
# We've already attached debugging info at some lower level: just use it
debugging = error.debugging_info
elif hasattr(error, "traceback"):
debugging = error.traceback
else:
debugging = None
raise ModuleExecutionError("error in worker process: %s" % error,
cause=error, debugging_info=debugging)
input_feeder.check_for_error()
except:
raise
else:
# Got a result from the process
break
# We've got a result and, since we're only using 1 process, it must be the right one: check
if (result.archive, result.filename) != next_document:
raise ModuleExecutionError(
"something went wrong with filter execution. Expected result for %s/%s, but got %s/%s"
% (result.archive, result.filename, next_document[0], next_document[1])
)
# Next document processed: yield the result
if type(result) is InvalidDocument:
invalid_outputs += 1
# Just got a single invalid document out
yield result.archive, result.filename, result
else:
# Here the normal executor would write the outputs to disk
# Instead we simply yield the one we're interested in
# Use the writer to convert it from what it expects from the processing to raw text
if type(result.data) is tuple:
data = result.data[self.output_num]
else:
data = result.data
data = writer.document_to_raw_data(data)
if not self.raw_data:
# If not outputting raw data, now use the output datatype to convert back from raw text
# It may seem a waste of time to convert to and from text, but sometimes the conversions
# are not symmetrical: e.g. the output writer might produce raw output and not process
# it before writing to disk
data = dummy_reader.process_document(data)
yield result.archive, result.filename, data
# Check what document we're waiting for now
next_document = input_feeder.get_next_output_document()
# We get a None next_document if there's an error in the input feeder at the beginning
# Check whether this has happened
input_feeder.check_for_error()
complete = True
except Exception, e:
# Any other uncaught exception should be passed up as a ModuleExecutionError, since we're actually
# executing a module here, even though we're pretending to iterate over data
# This causes the restart procedure to catch the error just as if something went wrong in map execution
if hasattr(e, "debugging_info"):
# We've already attached debugging info at some lower level: just use it
debugging = e.debugging_info
elif hasattr(e, "traceback"):
debugging = e.traceback
else:
debugging = None
raise ModuleExecutionError("error in filter %s: %s" % (self.wrapped_module_info.module_name, e),
cause=e, debugging_info=debugging)
finally:
# Call the finishing-off routine, if one's been defined
executor.postprocess(error=not complete)
executor.log.info("[%s filter] Input contained %d invalid documents, output contained %d" %
(self.wrapped_module_info.module_name, invalid_inputs, invalid_outputs))
[docs] def data_ready(self):
"""
Ready to supply this data as soon as all the wrapper module's inputs are ready to produce their data.
"""
return all(input_type.data_ready()
for input_name in self.wrapped_module_info.input_names
for input_type in self.wrapped_module_info.get_input(input_name, always_list=True))
def _wrap_output(module_info_instance, inner_output_name):
__, output_datatype = module_info_instance.get_output_datatype(inner_output_name)
if not issubclass(output_datatype, TarredCorpus):
# Can only wrap TarredCorpus outputs of a document map
raise PipelineStructureError("problem treating module '%s' as a filter. Tried to wrap output '%s' with a "
"datatype that produces the output on the fly, but it's not a subclass of "
"TarredCorpus" % (module_info_instance.module_name, inner_output_name))
# Create a special subclass of the general output wrapper for this output
# Doing so using type() instead of a class def allows us to give it an informative class name
wrapper_cls = type(
"%sFilterWrapper" % output_datatype.__name__,
(DocumentMapOutputTypeWrapper, output_datatype),
dict(
non_filter_datatype=output_datatype,
wrapped_module_info=module_info_instance,
output_name=inner_output_name,
)
)
return wrapper_cls
[docs]def wrap_module_info_as_filter(module_info_instance):
# Wrap each of the output datatypes so that it executes the document processing on the fly
wrapped_outputs = []
for output_name in module_info_instance.output_names:
wrapped_outputs.append((output_name, _wrap_output(module_info_instance, output_name)))
class ModuleInfo(BaseModuleInfo):
module_type_name = "%s_filter" % module_info_instance.module_type_name
module_options = []
module_inputs = module_info_instance.module_inputs
module_outputs = wrapped_outputs
module_optional_outputs = []
module_executable = False
return ModuleInfo(
module_info_instance.module_name,
module_info_instance.pipeline,
inputs=module_info_instance.inputs,
options={},
optional_outputs=[]
)