pimlico.core.modules.map package

Module contents

exception pimlico.core.modules.map.WorkerShutdownError(*args, **kwargs)[source]

Bases: exceptions.Exception

exception pimlico.core.modules.map.WorkerStartupError(*args, **kwargs)[source]

Bases: exceptions.Exception

class pimlico.core.modules.map.DocumentMapModuleExecutor(module_instance_info, **kwargs)[source]

Bases: pimlico.core.modules.base.BaseModuleExecutor

Base class for executors for document map modules. Subclasses should provide the behaviour for each individual document by defining a pool (and worker processes) to handle the documents as they’re fed into it.

Note that in most cases it won’t be necessary to override the pool and worker base classes yourself. Unless you need special behaviour, use the standard implementations and factory functions.

Although the pattern of execution for all document map modules is based on parallel processing (creating a pool, spawning worker processes, etc), this doesn’t mean that all such modules have to be parallelizable. If you have no reason not to parallelize, it’s recommended that you do (with single-process execution as a special case). However, sometimes parallelizing isn’t so simple: in these cases, consider using the tools in :mod:.singleproc.

create_pool(processes)[source]

Should return an instance of the pool to be used for document processing. Should generally be a subclass of DocumentProcessorPool.

Always called after preprocess().

execute()[source]
postprocess(error=False)[source]

Allows subclasses to define a finishing procedure to be called after corpus processing if finished.

preprocess()[source]

Allows subclasses to define a set-up procedure to be called before corpus processing begins.

retrieve_processing_status()[source]
update_processing_status(docs_completed, archive_name, filename)[source]
class pimlico.core.modules.map.DocumentMapModuleInfo(module_name, pipeline, **kwargs)[source]

Bases: pimlico.core.modules.base.BaseModuleInfo

Abstract module type that maps each document in turn in a corpus. It produces a single output document for every input.

Subclasses should specify the input types, which should all be subclasses of TarredCorpus, and output types, the first of which (i.e. default) should also be a subclass of TarredCorpus. The base class deals with iterating over the input(s) and writing the outputs to a new TarredCorpus. The subclass only needs to implement the mapping function applied to each document (in its executor).

get_detailed_status()[source]
get_writer(output_name, output_dir, append=False)[source]

Get the writer instance that will be given processed documents to write. Should return a subclass of TarredCorpusWriter. The default implementation instantiates a plain TarredCorpusWriter.

get_writers(append=False)[source]
module_outputs = [('documents', <class 'pimlico.datatypes.tar.TarredCorpus'>)]
class pimlico.core.modules.map.DocumentMapProcessMixin(input_queue, output_queue, exception_queue, docs_per_batch=1)[source]

Bases: object

Mixin/base class that should be implemented by all worker processes for document map pools.

notify_no_more_inputs()[source]

Called when there aren’t any more inputs to come.

process_document(archive, filename, *docs)[source]
process_documents(doc_tuples)[source]

Batched version of process_document(). Default implementation just calls process_document() on each document, but if you want to group documents together and process multiple at once, you can override this method and make sure the docs_per_batch is set > 1.

Each item in the list of doc tuples should be a tuple of the positional args to process_document() – i.e. archive_name, filename, doc_from_corpus1, [doc_from corpus2, ...]

set_up()[source]

Called when the process starts, before it starts accepting documents.

tear_down()[source]

Called from within the process after processing is complete, before exiting.

class pimlico.core.modules.map.DocumentProcessorPool(processes)[source]

Bases: object

Base class for pools that provide an easy implementation of parallelization for document map modules. Defines the core interface for pools.

If you’re using multiprocessing, you’ll want to use the multiprocessing-specific subclass.

static create_queue(maxsize=None)[source]

May be overridden by subclasses to provide different implementations of a Queue. By default, uses the multiprocessing queue type. Whatever is returned, it should implement the interface of Queue.Queue.

notify_no_more_inputs()[source]
class pimlico.core.modules.map.InputQueueFeeder(input_queue, iterator, complete_callback=None)[source]

Bases: threading.Thread

Background thread to read input documents from an iterator and feed them onto an input queue for worker processes/threads.

check_for_error()[source]

Can be called from the main thread to check whether an error has occurred in this thread and raise a suitable exception if so

get_next_output_document()[source]
run()[source]
class pimlico.core.modules.map.ProcessOutput(archive, filename, data)[source]

Bases: object

Wrapper for all result data coming out from a worker.

pimlico.core.modules.map.invalid_doc_on_error(fn)[source]

Decorator to apply to process_document() methods that causes all exceptions to be caught and an InvalidDocument to be returned as the result, instead of letting the error propagate up and call a halt to the whole corpus processing.

pimlico.core.modules.map.invalid_docs_on_error(fn)[source]

Decorator to apply to process_documents() methods that causes all exceptions to be caught and an InvalidDocument to be returned as the result for every input document.

pimlico.core.modules.map.skip_invalid(fn)[source]

Decorator to apply to document map executor process_document() methods where you want to skip doing any processing if any of the input documents are invalid and just pass through the error information.

Be careful not to confuse this with the process_document() methods on datatypes. You don’t need a decorator on them to skip invalid documents, as it’s not called on them anyway.

pimlico.core.modules.map.skip_invalids(fn)[source]

Decorator to apply to document map executor process_documents() methods where you want to skip doing any processing if any of the input documents are invalid and just pass through the error information.