import threading
from Queue import Queue, Empty
from threading import Thread
from time import sleep
from traceback import format_exc
from pimlico.core.config import PipelineStructureError
from pimlico.core.modules.base import BaseModuleInfo, BaseModuleExecutor, satisfies_typecheck
from pimlico.core.modules.execute import ModuleExecutionError, StopProcessing
from pimlico.datatypes.base import InvalidDocument
from pimlico.datatypes.tar import TarredCorpus, AlignedTarredCorpora, TarredCorpusWriter
from pimlico.utils.core import multiwith
from pimlico.utils.pipes import qget
from pimlico.utils.progress import get_progress_bar
[docs]class DocumentMapModuleInfo(BaseModuleInfo):
"""
Abstract module type that maps each document in turn in a corpus. It produces a single output
document for every input.
Subclasses should specify the input types, which should all be subclasses of
TarredCorpus, and output types, the first of which (i.e. default) should also be a
subclass of TarredCorpus. The base class deals with iterating over the input(s) and
writing the outputs to a new TarredCorpus. The subclass only needs to implement the
mapping function applied to each document (in its executor).
"""
# Most subclasses will want to override this to give a more specific datatype for the output
module_outputs = [("documents", TarredCorpus)]
def __init__(self, module_name, pipeline, **kwargs):
super(DocumentMapModuleInfo, self).__init__(module_name, pipeline, **kwargs)
# Load all the input datatypes now in order to perform typechecking
# Note that we don't store these now, but reload them when we start execution, as it's important
# that they've been freshly instantiated prior to execution
# Otherwise they can end up cacheing things that get changed by other modules executed first
self._load_inputs()
def _load_inputs(self):
# Prepare the list of document map inputs that will be fed into the executor
# We may have multiple inputs, which should be aligned tarred corpora
# If there's only one, this also works
inputs = [self.get_input(input_name) for input_name in self.input_names]
# We also allow (additional) inputs that are not tarred corpora, which get left out of this
datasets = [corpus for corpus in inputs if satisfies_typecheck(corpus, TarredCorpus)]
if len(datasets) == 0:
raise PipelineStructureError(
"document map module '%s' got no TarredCorpus instances among its inputs" % self.module_name)
return datasets
input_corpora = property(_load_inputs)
[docs] def get_writer(self, output_name, output_dir, append=False):
"""
Get the writer instance that will be given processed documents to write. Should return
a subclass of TarredCorpusWriter. The default implementation instantiates a plain
TarredCorpusWriter.
"""
return TarredCorpusWriter(output_dir, append=append)
[docs] def get_writers(self, append=False):
# Only include the outputs that are tarred corpus types
# This allows there to be other outputs aside from those mapped to
outputs = [name for name in self.output_names if isinstance(self.get_output(name), TarredCorpus)]
return tuple(self.get_writer(name, self.get_absolute_output_dir(name), append=append) for name in outputs)
[docs] def get_detailed_status(self):
status_lines = super(DocumentMapModuleInfo, self).get_detailed_status()
if self.status == "PARTIALLY_PROCESSED":
status_lines.append("Processed %d documents" % self.get_metadata()["docs_completed"])
status_lines.append("Last doc completed: %s" % self.get_metadata()["last_doc_completed"])
return status_lines
[docs]class DocumentMapModuleExecutor(BaseModuleExecutor):
"""
Base class for executors for document map modules. Subclasses should provide the behaviour
for each individual document by defining a pool (and worker processes) to handle the documents as
they're fed into it.
Note that in most cases it won't be necessary to override the pool and worker base classes yourself.
Unless you need special behaviour, use the standard implementations and factory functions.
Although the pattern of execution for all document map modules is based on parallel processing (creating a pool,
spawning worker processes, etc), this doesn't mean that all such modules have to be parallelizable. If you
have no reason not to parallelize, it's recommended that you do (with single-process execution as a special
case). However, sometimes parallelizing isn't so simple: in these cases, consider using the tools in
:mod:.singleproc.
"""
def __init__(self, module_instance_info, **kwargs):
super(DocumentMapModuleExecutor, self).__init__(module_instance_info, **kwargs)
self.input_corpora = self.info.input_corpora
self.input_iterator = AlignedTarredCorpora(self.input_corpora)
[docs] def preprocess(self):
"""
Allows subclasses to define a set-up procedure to be called before corpus processing begins.
"""
pass
[docs] def postprocess(self, error=False):
"""
Allows subclasses to define a finishing procedure to be called after corpus processing if finished.
"""
pass
[docs] def create_pool(self, processes):
"""
Should return an instance of the pool to be used for document processing. Should generally be a
subclass of DocumentProcessorPool.
Always called after preprocess().
"""
raise NotImplementedError
[docs] def retrieve_processing_status(self):
# Check the metadata to see whether we've already partially completed this
if self.info.status == "PARTIALLY_PROCESSED":
docs_completed = self.info.get_metadata()["docs_completed"]
first_archive, __, first_filename = self.info.get_metadata()["last_doc_completed"].partition("/")
start_after = (first_archive, first_filename)
self.log.info(
"Module has been partially executed already; picking up where we left off, after doc %s/%s "
"(skipping %d docs, %d to process)" %
(start_after[0], start_after[1], docs_completed, (len(self.input_iterator) - docs_completed))
)
else:
docs_completed = 0
start_after = None
return docs_completed, start_after
[docs] def update_processing_status(self, docs_completed, archive_name, filename):
self.info.set_metadata_values({
"status": "PARTIALLY_PROCESSED",
"last_doc_completed": "%s/%s" % (archive_name, filename),
"docs_completed": docs_completed,
})
[docs] def execute(self):
# Call the set-up routine, if one's been defined
self.log.info("Preparing parallel document map execution with %d processes" % self.processes)
self.preprocess()
# Start up a pool
try:
self.pool = self.create_pool(self.processes)
except WorkerStartupError, e:
raise ModuleExecutionError(e.message, cause=e.cause, debugging_info=e.debugging_info)
complete = False
result_buffer = {}
input_feeder = None
docs_completed_now = 0
docs_completed_before, start_after = self.retrieve_processing_status()
total_to_process = len(self.input_iterator) - docs_completed_before
try:
# Prepare a corpus writer for the output
with multiwith(*self.info.get_writers(append=start_after is not None)) as writers:
if total_to_process < 1:
# No input documents, don't go any further
# We've come in this far so that the writer gets created and finishing up is done:
# might need to finish writing metadata or suchlike if we failed last time once all docs were done
self.log.info("No documents to process")
else:
pbar = get_progress_bar(total_to_process, counter=True,
title="%s map" % self.info.module_type_name.replace("_", " ").capitalize())
self.log.info("Starting execution on %d docs" % total_to_process)
# Inputs will be taken from this as they're needed
input_iter = iter(self.input_iterator.archive_iter(start_after=start_after))
# Set a thread going to feed things onto the input queue
input_feeder = InputQueueFeeder(self.pool.input_queue, input_iter,
complete_callback=self.pool.notify_no_more_inputs)
# Wait to make sure the input feeder's fed something into the input queue
input_feeder.started.wait()
# Check what document we're looking for next
next_document = input_feeder.get_next_output_document()
while next_document is not None:
# Wait for a document coming off the output queue
while True:
try:
# Wait a little bit to see if there's a result available
result = qget(self.pool.output_queue, timeout=0.2)
except Empty:
# Timed out: check there's not been an error in one of the processes
try:
error = self.pool.exception_queue.get_nowait()
except Empty:
# No error: just keep waiting
pass
else:
# Got an error from a process: raise it
# First empty the exception queue, in case there were multiple errors
sleep(0.05)
while not self.pool.exception_queue.empty():
qget(self.pool.exception_queue, timeout=0.1)
# Sometimes, a traceback from within the process is included
debugging = error.traceback if hasattr(error, "traceback") else None
raise ModuleExecutionError("error in worker process: %s" % error,
cause=error, debugging_info=debugging)
except:
raise
else:
# Got a result from a process
break
# We've got some result, but it might not be the one we're looking for
# Add it to a buffer, so we can potentially keep it and only output it when its turn comes up
result_buffer[(result.archive, result.filename)] = result.data
pbar.update(docs_completed_now + len(result_buffer))
# Write out as many as we can of the docs that have been sent and whose output is available
# while maintaining the order they were put in in
while next_document in result_buffer:
archive, filename = next_document
next_output = result_buffer.pop((archive, filename))
# Next document processed: output the result precisely as in the single-core case
if type(next_output) is InvalidDocument:
# Just got a single invalid document out: write it out to every output
next_output = [next_output] * len(writers)
elif type(next_output) is not tuple:
# If the processor produces a single result and there's only one output, fine
next_output = (next_output,)
if len(next_output) != len(writers):
raise ModuleExecutionError(
"%s executor's process_document() returned %d results for a document, but the "
"module has %d outputs" % (type(self).__name__, len(next_output), len(writers))
)
# Write the result to the output corpora
for result, writer in zip(next_output, writers):
writer.add_document(archive, filename, result)
# Update the module's metadata to say that we've completed this document
docs_completed_now += 1
self.update_processing_status(docs_completed_before+docs_completed_now, archive, filename)
# Check what document we're waiting for now
next_document = input_feeder.get_next_output_document()
pbar.finish()
complete = True
except ModuleExecutionError, e:
if self.info.status == "PARTIALLY_PROCESSED":
self.log.info("Processed documents recorded: restart processing where you left off by calling run "
"again once you've fixed the problem (%d docs processed in this run, %d processed in "
"total)" % (docs_completed_now, docs_completed_before+docs_completed_now))
# Set the end status so that the top-level routine doesn't replace it with a generic failure status
e.end_status = self.info.status
raise
finally:
# Call the finishing-off routine, if one's been defined
if complete:
self.log.info("Document mapping complete. Finishing off")
else:
self.log.info("Document mapping failed. Finishing off")
self.postprocess(error=not complete)
if input_feeder is not None:
input_feeder.cancel()
input_feeder.empty_all_queues()
[docs]def skip_invalid(fn):
"""
Decorator to apply to document map executor process_document() methods where you want to skip doing any
processing if any of the input documents are invalid and just pass through the error information.
Be careful not to confuse this with the process_document() methods on datatypes. You don't need a decorator
on them to skip invalid documents, as it's not called on them anyway.
"""
def _fn(self, archive, filename, *docs):
invalid = [doc for doc in docs if type(doc) is InvalidDocument]
if len(invalid):
# If there's more than one InvalidDocument among the inputs, just return the first one
return invalid[0]
else:
return fn(self, archive, filename, *docs)
return _fn
[docs]def skip_invalids(fn):
"""
Decorator to apply to document map executor process_documents() methods where you want to skip doing any
processing if any of the input documents are invalid and just pass through the error information.
"""
def _fn(self, input_tuples):
invalids = [[doc for doc in args[2:] if type(doc) is InvalidDocument] for args in input_tuples]
# Leave out input tuples where there are invalid docs
input_tuples = [input_tuple for (input_tuple, invalid) in zip(input_tuples, invalids) if len(invalid) == 0]
# Only call the inner function in those without invalid docs
results = fn(self, input_tuples) if len(input_tuples) else []
# Reinsert a single invalid doc in the positions where they were in the input
for i, invalid in enumerate(invalids):
if len(invalid) > 0:
# As with the single-doc version, use the first invalid doc among the inputs
results.insert(i, invalid[0])
return results
return _fn
[docs]def invalid_doc_on_error(fn):
"""
Decorator to apply to process_document() methods that causes all exceptions to be caught and an InvalidDocument
to be returned as the result, instead of letting the error propagate up and call a halt to the whole corpus
processing.
"""
def _fn(self, *args, **kwargs):
try:
return fn(self, *args, **kwargs)
except StopProcessing:
# Processing was cancelled, killed or otherwise called to a halt
# Don't report this as an error processing a doc, but raise it
raise
except Exception, e:
# Error while processing the document: output an invalid document, with some error information
if isinstance(self, TarredCorpus):
# Decorator wrapped a process_document() method on a datatype
# Instead of the module name, output the datatype name and its base dir
return InvalidDocument("datatype:%s[%s]" % (self.datatype_name, self.base_dir),
"%s\n%s" % (e, format_exc()))
else:
# This covers the case of wrapping a process_document() function for a map factory,
# since the first argument is always the worker process
return InvalidDocument(self.info.module_name, "%s\n%s" % (e, format_exc()))
return _fn
[docs]def invalid_docs_on_error(fn):
"""
Decorator to apply to process_documents() methods that causes all exceptions to be caught and an InvalidDocument
to be returned as the result for every input document.
"""
def _fn(self, input_tuples):
try:
return fn(self, input_tuples)
except StopProcessing:
# Processing was cancelled, killed or otherwise called to a halt
# Don't report this as an error processing a doc, but raise it
raise
except Exception, e:
# Error while processing the document: output invalid documents, with some error information
if isinstance(self, TarredCorpus):
# Decorator wrapped a process_document() method on a datatype
# Instead of the module name, output the datatype name and its base dir
return [InvalidDocument("datatype:%s[%s]" % (self.datatype_name, self.base_dir),
"%s\n%s" % (e, format_exc()))] * len(input_tuples)
else:
return [InvalidDocument(self.info.module_name, "%s\n%s" % (e, format_exc()))] * len(input_tuples)
return _fn
[docs]class ProcessOutput(object):
"""
Wrapper for all result data coming out from a worker.
"""
def __init__(self, archive, filename, data):
self.data = data
self.filename = filename
self.archive = archive
[docs]class DocumentProcessorPool(object):
"""
Base class for pools that provide an easy implementation of parallelization for document map modules.
Defines the core interface for pools.
If you're using multiprocessing, you'll want to use the multiprocessing-specific subclass.
"""
def __init__(self, processes):
self.output_queue = self.create_queue()
self.input_queue = self.create_queue(2*processes)
self.exception_queue = self.create_queue()
self.processes = processes
@staticmethod
[docs] def create_queue(maxsize=None):
"""
May be overridden by subclasses to provide different implementations of a Queue. By default, uses the
multiprocessing queue type. Whatever is returned, it should implement the interface of Queue.Queue.
"""
return Queue(maxsize=maxsize)
[docs]class DocumentMapProcessMixin(object):
"""
Mixin/base class that should be implemented by all worker processes for document map pools.
"""
def __init__(self, input_queue, output_queue, exception_queue, docs_per_batch=1):
self.docs_per_batch = docs_per_batch
self.exception_queue = exception_queue
self.output_queue = output_queue
self.input_queue = input_queue
[docs] def set_up(self):
"""
Called when the process starts, before it starts accepting documents.
"""
pass
[docs] def process_document(self, archive, filename, *docs):
raise NotImplementedError
[docs] def process_documents(self, doc_tuples):
"""
Batched version of process_document(). Default implementation just calls process_document() on each document,
but if you want to group documents together and process multiple at once, you can override this method
and make sure the `docs_per_batch` is set > 1.
Each item in the list of doc tuples should be a tuple of the positional args to process_document() --
i.e. archive_name, filename, doc_from_corpus1, [doc_from corpus2, ...]
"""
return [self.process_document(*doc_tuple) for doc_tuple in doc_tuples]
[docs] def tear_down(self):
"""
Called from within the process after processing is complete, before exiting.
"""
pass
[docs]class WorkerStartupError(Exception):
def __init__(self, *args, **kwargs):
self.cause = kwargs.pop("cause", None)
self.debugging_info = kwargs.pop("debugging_info", None)
super(WorkerStartupError, self).__init__(*args, **kwargs)
[docs]class WorkerShutdownError(Exception):
def __init__(self, *args, **kwargs):
self.cause = kwargs.pop("cause", None)
super(WorkerShutdownError, self).__init__(*args, **kwargs)