pimlico.core.modules.map package

Module contents

class DocumentMapModuleInfo(module_name, pipeline, **kwargs)[source]

Bases: pimlico.core.modules.base.BaseModuleInfo

Abstract module type that maps each document in turn in a corpus. It produces a single output document for every input.

Subclasses should specify the input types, which should all be subclasses of TarredCorpus, and output types, the first of which (i.e. default) should also be a subclass of TarredCorpus. The base class deals with iterating over the input(s) and writing the outputs to a new TarredCorpus. The subclass only needs to implement the mapping function applied to each document (in its executor).

module_outputs = [('documents', <class 'pimlico.datatypes.tar.TarredCorpus'>)]
input_corpora
get_writer(output_name, output_dir, append=False)[source]

Get the writer instance that will be given processed documents to write. Should return a subclass of TarredCorpusWriter. The default implementation instantiates a plain TarredCorpusWriter.

get_writers(append=False)[source]
get_detailed_status()[source]

Returns a list of strings, containing detailed information about the module’s status that is specific to the module type. This may include module-specific information about execution status, for example.

Subclasses may override this to supply useful (human-readable) information specific to the module type. They should called the super method.

class DocumentMapModuleExecutor(module_instance_info, **kwargs)[source]

Bases: pimlico.core.modules.base.BaseModuleExecutor

Base class for executors for document map modules. Subclasses should provide the behaviour for each individual document by defining a pool (and worker processes) to handle the documents as they’re fed into it.

Note that in most cases it won’t be necessary to override the pool and worker base classes yourself. Unless you need special behaviour, use the standard implementations and factory functions.

Although the pattern of execution for all document map modules is based on parallel processing (creating a pool, spawning worker processes, etc), this doesn’t mean that all such modules have to be parallelizable. If you have no reason not to parallelize, it’s recommended that you do (with single-process execution as a special case). However, sometimes parallelizing isn’t so simple: in these cases, consider using the tools in :mod:.singleproc.

preprocess()[source]

Allows subclasses to define a set-up procedure to be called before corpus processing begins.

postprocess(error=False)[source]

Allows subclasses to define a finishing procedure to be called after corpus processing if finished.

create_pool(processes)[source]

Should return an instance of the pool to be used for document processing. Should generally be a subclass of DocumentProcessorPool.

Always called after preprocess().

retrieve_processing_status()[source]
update_processing_status(docs_completed, archive_name, filename)[source]
execute()[source]

Run the actual module execution.

May return None, in which case it’s assumed to have fully completed. If a string is returned, it’s used as an alternative module execution status. Used, e.g., by multi-stage modules that need to be run multiple times.

skip_invalid(fn)[source]

Decorator to apply to document map executor process_document() methods where you want to skip doing any processing if any of the input documents are invalid and just pass through the error information.

Be careful not to confuse this with the process_document() methods on datatypes. You don’t need a decorator on them to skip invalid documents, as it’s not called on them anyway.

skip_invalids(fn)[source]

Decorator to apply to document map executor process_documents() methods where you want to skip doing any processing if any of the input documents are invalid and just pass through the error information.

invalid_doc_on_error(fn)[source]

Decorator to apply to process_document() methods that causes all exceptions to be caught and an InvalidDocument to be returned as the result, instead of letting the error propagate up and call a halt to the whole corpus processing.

invalid_docs_on_error(fn)[source]

Decorator to apply to process_documents() methods that causes all exceptions to be caught and an InvalidDocument to be returned as the result for every input document.

class ProcessOutput(archive, filename, data)[source]

Bases: object

Wrapper for all result data coming out from a worker.

class InputQueueFeeder(input_queue, iterator, complete_callback=None)[source]

Bases: threading.Thread

Background thread to read input documents from an iterator and feed them onto an input queue for worker processes/threads.

get_next_output_document()[source]
check_invalid(archive, filename)[source]

Checks whether a given document was invalid in the input. Once the check has been performed, the item is removed from the list, for efficiency, so this should only be called once per document.

run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

check_for_error()[source]

Can be called from the main thread to check whether an error has occurred in this thread and raise a suitable exception if so

shutdown(timeout=3.0)[source]

Cancel the feeder, if it’s still feeding and stop the thread. Call only after you’re sure you no longer need anything from any of the queues. Waits for the thread to end.

Call from the main thread (that created the feeder) only.

class DocumentProcessorPool(processes)[source]

Bases: object

Base class for pools that provide an easy implementation of parallelization for document map modules. Defines the core interface for pools.

If you’re using multiprocessing, you’ll want to use the multiprocessing-specific subclass.

notify_no_more_inputs()[source]
static create_queue(maxsize=None)[source]

May be overridden by subclasses to provide different implementations of a Queue. By default, uses the multiprocessing queue type. Whatever is returned, it should implement the interface of Queue.Queue.

shutdown()[source]
empty_all_queues()[source]
class DocumentMapProcessMixin(input_queue, output_queue, exception_queue, docs_per_batch=1)[source]

Bases: object

Mixin/base class that should be implemented by all worker processes for document map pools.

set_up()[source]

Called when the process starts, before it starts accepting documents.

process_document(archive, filename, *docs)[source]
process_documents(doc_tuples)[source]

Batched version of process_document(). Default implementation just calls process_document() on each document, but if you want to group documents together and process multiple at once, you can override this method and make sure the docs_per_batch is set > 1.

Each item in the list of doc tuples should be a tuple of the positional args to process_document() – i.e. archive_name, filename, doc_from_corpus1, [doc_from corpus2, …]

tear_down()[source]

Called from within the process after processing is complete, before exiting.

notify_no_more_inputs()[source]

Called when there aren’t any more inputs to come.

exception WorkerStartupError(*args, **kwargs)[source]

Bases: exceptions.Exception

exception WorkerShutdownError(*args, **kwargs)[source]

Bases: exceptions.Exception