Source code for pimlico.core.modules.map.filter

# This file is part of Pimlico
# Copyright (C) 2016 Mark Granroth-Wilding
# Licensed under the GNU GPL v3.0 - http://www.gnu.org/licenses/gpl-3.0.en.html

from Queue import Empty
from time import sleep
from traceback import print_exc, format_exc

from pimlico.core.config import PipelineStructureError
from pimlico.core.modules.base import load_module_executor, BaseModuleInfo
from pimlico.core.modules.execute import ModuleExecutionError
from pimlico.core.modules.map import InputQueueFeeder, DocumentMapModuleInfo
from pimlico.datatypes.base import InvalidDocument
from pimlico.datatypes.tar import TarredCorpus, AlignedTarredCorpora
from pimlico.utils.pipes import qget


[docs]class DocumentMapOutputTypeWrapper(object): # Tells you the datatype that this simulates, for typechecking non_filter_datatype = None wrapped_module_info = None output_name = None def __init__(self, *args, **kwargs): raw_data = kwargs.pop("raw_data", False) # We should be a subclass of this datatype, so call its constructor self.non_filter_datatype.__init__(self, *args, **kwargs) self.raw_data = raw_data self.output_num = [name for name, type in self.wrapped_module_info.available_outputs].index(self.output_name) self.multiple_outputs = len(self.wrapped_module_info.available_outputs) > 1 self._input_iterator = None # Get hold of the outputs from the previous modules to iterate over them self.input_corpora = [self.wrapped_module_info.get_input(input_name) for input_name in self.wrapped_module_info.input_names] self.input_iterator = AlignedTarredCorpora(self.input_corpora) def __len__(self): # Delegate to input datatypes return len(self.input_iterator) def __iter__(self): for __, doc_name, doc in self.archive_iter(): yield doc_name, doc
[docs] def archive_iter(self, subsample=None, start_after=None): """ Provides an iterator just like TarredCorpus, but instead of iterating over data read from disk, gets it on the fly from the input datatype. """ # To make sure we're ready to iterate over the input data and have all the metadata we need, we must # actually create the output writer now output_dir = self.wrapped_module_info.get_absolute_output_dir(self.output_name) # Get hold of the outputs from the previous modules to iterate over them input_iterator = self.input_iterator # Load an executor for the module we're wrapping, so we can use some of its functionality executor_cls = self.wrapped_module_info.load_executor() executor = executor_cls(self.wrapped_module_info) # Call the set-up routine, if one's been defined executor.log.info( "Preparing document map execution for filter module %s" % self.wrapped_module_info.module_name ) executor.preprocess() # Start up a processing pool, but only ever use a single process executor.pool = executor.create_pool(1) complete = False invalid_inputs = 0 invalid_outputs = 0 try: # Prepare a corpus writer for the output with self.wrapped_module_info.get_writer(self.output_name, output_dir) as writer: # Now that we've created the writer, we should be able to initialize a corresponding reader # Of course, we can't read any documents from it, but we can use its postprocessing function dummy_reader = self.wrapped_module_info.get_output(self.output_name) # Set a thread going to feed things onto the input queue input_feeder = InputQueueFeeder( executor.pool.input_queue, input_iterator.archive_iter(subsample=subsample, start_after=start_after), complete_callback=executor.pool.notify_no_more_inputs ) # Wait to make sure the input feeder's fed something into the input queue input_feeder.started.wait() # Check what document we're looking for next next_document = input_feeder.get_next_output_document() while next_document is not None: # Wait for a document coming off the output queue while True: try: # Wait a little bit to see if there's a result available result = qget(executor.pool.output_queue, timeout=0.2) except Empty: # Timed out: check there's not been an error in one of the processes try: error = executor.pool.exception_queue.get_nowait() except Empty: # No error: just keep waiting pass else: # Got an error from a process: raise it # First empty the exception queue, in case there were multiple errors sleep(0.05) while not executor.pool.exception_queue.empty(): qget(executor.pool.exception_queue, timeout=0.1) # Sometimes, a traceback from within the process is included if hasattr(error, "debugging_info"): # We've already attached debugging info at some lower level: just use it debugging = error.debugging_info elif hasattr(error, "traceback"): debugging = error.traceback else: debugging = None raise ModuleExecutionError("error in worker process: %s" % error, cause=error, debugging_info=debugging) input_feeder.check_for_error() except: raise else: # Got a result from the process break # We've got a result and, since we're only using 1 process, it must be the right one: check if (result.archive, result.filename) != next_document: raise ModuleExecutionError( "something went wrong with filter execution. Expected result for %s/%s, but got %s/%s" % (result.archive, result.filename, next_document[0], next_document[1]) ) # Next document processed: yield the result if type(result) is InvalidDocument: invalid_outputs += 1 # Just got a single invalid document out yield result.archive, result.filename, result else: # Here the normal executor would write the outputs to disk # Instead we simply yield the one we're interested in # Use the writer to convert it from what it expects from the processing to raw text if self.multiple_outputs: data = result.data[self.output_num] else: data = result.data data = writer.document_to_raw_data(data) if not self.raw_data: # If not outputting raw data, now use the output datatype to convert back from raw text # It may seem a waste of time to convert to and from text, but sometimes the conversions # are not symmetrical: e.g. the output writer might produce raw output and not process # it before writing to disk data = dummy_reader.process_document(data) yield result.archive, result.filename, data # Check what document we're waiting for now next_document = input_feeder.get_next_output_document() # We get a None next_document if there's an error in the input feeder at the beginning # Check whether this has happened input_feeder.check_for_error() complete = True except Exception, e: # Any other uncaught exception should be passed up as a ModuleExecutionError, since we're actually # executing a module here, even though we're pretending to iterate over data # This causes the restart procedure to catch the error just as if something went wrong in map execution if hasattr(e, "debugging_info"): # We've already attached debugging info at some lower level: just use it debugging = e.debugging_info elif hasattr(e, "traceback"): debugging = e.traceback else: # Just include the stack trace from this process debugging = format_exc() raise ModuleExecutionError("error in filter %s: %s" % (self.wrapped_module_info.module_name, e), cause=e, debugging_info=debugging) finally: # Call the finishing-off routine, if one's been defined executor.postprocess(error=not complete) executor.log.info("[%s filter] Input contained %d invalid documents, output contained %d" % (self.wrapped_module_info.module_name, invalid_inputs, invalid_outputs))
[docs] def data_ready(self): """ Ready to supply this data as soon as all the wrapper module's inputs are ready to produce their data. """ return all(input_type.data_ready() for input_name in self.wrapped_module_info.input_names for input_type in self.wrapped_module_info.get_input(input_name, always_list=True))
def _wrap_output(module_info_instance, inner_output_name): __, output_datatype = module_info_instance.get_output_datatype(inner_output_name) if not issubclass(output_datatype, TarredCorpus): # Can only wrap TarredCorpus outputs of a document map raise PipelineStructureError("problem treating module '%s' as a filter. Tried to wrap output '%s' with a " "datatype that produces the output on the fly, but it's not a subclass of " "TarredCorpus" % (module_info_instance.module_name, inner_output_name)) # Create a special subclass of the general output wrapper for this output # Doing so using type() instead of a class def allows us to give it an informative class name wrapper_cls = type( "%sFilterWrapper" % output_datatype.__name__, (DocumentMapOutputTypeWrapper, output_datatype), dict( non_filter_datatype=output_datatype, wrapped_module_info=module_info_instance, output_name=inner_output_name, ) ) return wrapper_cls
[docs]def wrap_module_info_as_filter(module_info_instance): """ Create a filter module from a document map module so that it gets executed on the fly to provide its outputs as input to later modules. Can be applied to any document map module simply by adding `filter=T` to its config. This function is called when `filter=T` is given. :param module_info_instance: basic module info to wrap the outputs of :return: a new non-executable ModuleInfo whose outputs are produced on the fly and will be identical to the outputs of the wrapper module. """ # Check that this is a document map module: otherwise it doesn't make sense to wrap it if not isinstance(module_info_instance, DocumentMapModuleInfo): raise PipelineStructureError("cannot create a filter from a %s module, as it's not a document map module " "(tried to run module '%s' as a filter)" % (module_info_instance.module_type_name, module_info_instance.module_name)) # Wrap each of the output datatypes so that it executes the document processing on the fly wrapped_outputs = [] for output_name in module_info_instance.output_names: wrapped_outputs.append((output_name, _wrap_output(module_info_instance, output_name))) class ModuleInfo(BaseModuleInfo): module_type_name = "%s_filter" % module_info_instance.module_type_name module_options = [] module_inputs = module_info_instance.module_inputs module_outputs = wrapped_outputs module_optional_outputs = [] module_executable = False return ModuleInfo( module_info_instance.module_name, module_info_instance.pipeline, inputs=module_info_instance.inputs, options={}, optional_outputs=[] )