Source code for pimlico.core.modules.map.singleproc

# This file is part of Pimlico
# Copyright (C) 2016 Mark Granroth-Wilding
# Licensed under the GNU GPL v3.0 - http://www.gnu.org/licenses/gpl-3.0.en.html

"""
Sometimes the simple multiprocessing-based approach to map module parallelization just isn't suitable.
This module provides an equivalent set of implementations and convenience functions that don't use
multiprocessing, but conform to the pool-based execution pattern by creating a single-thread pool.

"""
from .threaded import ThreadingMapModuleExecutor, ThreadingMapThread, ThreadingMapPool


[docs]class SingleThreadMapModuleExecutor(ThreadingMapModuleExecutor):
[docs] def create_pool(self, processes): return super(SingleThreadMapModuleExecutor, self).create_pool(1)
[docs]def single_process_executor_factory(process_document_fn, preprocess_fn=None, postprocess_fn=None, worker_set_up_fn=None, worker_tear_down_fn=None, batch_docs=None): """ Factory function for creating an executor that uses the single-process implementations of document-map pools and workers. This is an easy way to implement a non-parallelized executor process_document_fn should be a function that takes the following arguments: - the executor instance (allowing access to things set during setup) - archive name - document name - the rest of the args are the document itself, from each of the input corpora If proprocess_fn is given, it is called once before execution begins, with the executor as an argument. If postprocess_fn is given, it is called at the end of execution, including on the way out after an error, with the executor as an argument and a kwarg *error* which is True if execution failed. """ if isinstance(process_document_fn, type): if not issubclass(process_document_fn, ThreadingMapThread): raise TypeError("called threading_executor_factory with a worker type that's not a subclass of " "ThreadingMapThread: got %s" % process_document_fn.__name__) worker_type = process_document_fn else: # Define a worker thread type class FactoryMadeMapThread(ThreadingMapThread): def process_document(self, archive, filename, *docs): return process_document_fn(self, archive, filename, *docs) def set_up(self): if worker_set_up_fn is not None: worker_set_up_fn(self) def tear_down(self): if worker_tear_down_fn is not None: worker_tear_down_fn(self) worker_type = FactoryMadeMapThread # Define a pool type to use this worker process type class FactoryMadeMapPool(ThreadingMapPool): THREAD_TYPE = worker_type # Finally, define an executor type (subclass of DocumentMapModuleExecutor) that creates a pool of the right sort class ModuleExecutor(SingleThreadMapModuleExecutor): POOL_TYPE = FactoryMadeMapPool def preprocess(self): super(ModuleExecutor, self).preprocess() if preprocess_fn is not None: preprocess_fn(self) def postprocess(self, error=False): super(ModuleExecutor, self).postprocess(error=error) if postprocess_fn is not None: postprocess_fn(self, error=error) return ModuleExecutor