Source code for pimlico.utils.probability

# This file is part of Pimlico
# Copyright (C) 2020 Mark Granroth-Wilding
# Licensed under the GNU LGPL v3.0 - https://www.gnu.org/licenses/lgpl-3.0.en.html

from builtins import next

from itertools import islice
import random
import itertools
from pimlico.utils.core import infinite_cycle


[docs]def limited_shuffle(iterable, buffer_size, rand_generator=None): """ Some algorithms require the order of data to be randomized. An obvious solution is to put it all in a list and shuffle, but if you don't want to load it all into memory that's not an option. This method iterates over the data, keeping a buffer and choosing at random from the buffer what to put next. It's less shuffled than the simpler solution, but limits the amount of memory used at any one time to the buffer size. """ it = iter(iterable) if rand_generator is None: # Default random generator just uses Python's randint to pick a random item from the buffer def get_rand(): while True: yield random.randint(0, buffer_size-1) rand_generator = iter(get_rand()) buffer = [] try: # Fill up the buffer while len(buffer) < buffer_size: buffer.append(next(it)) for next_val in it: # Pick a random item from the buffer to remove index = next(rand_generator) yield buffer.pop(index) # Add the new value to the buffer to replace the old one buffer.append(next_val) except StopIteration: # No more to take in, just return the rest in a random order # Here we don't use the given rand_generator, since it shouldn't generally take long anyway random.shuffle(buffer) for v in buffer: yield v
[docs]def limited_shuffle_numpy(iterable, buffer_size, randint_buffer_size=1000): """ Identical behaviour to :func:`limited_shuffle`, but uses Numpy's random sampling routines to generate a large number of random integers at once. This can make execution a bit bursty, but overall tends to speed things up, as we get the random sampling over in one big call to Numpy. """ return limited_shuffle(iterable, buffer_size, rand_generator=batched_randint(buffer_size, batch_size=randint_buffer_size))
[docs]def batched_randint(low, high=None, batch_size=1000): """ Infinite iterable that produces random numbers in the given range by calling Numpy now and then to generate lots of random numbers at once and then yielding them one by one. Faster than sampling one at a time. :param a: lowest number in range :param b: highest number in range :param batch_size: number of ints to generate in one go """ from numpy import random while True: randints = random.randint(low, high, size=batch_size) for i in randints: yield i
[docs]def sequential_document_sample(corpus, start=None, shuffle=None, sample_rate=None): """ Wrapper around a :class:`pimlico.datatypes.tar.TarredCorpus` to draw infinite samples of documents from the corpus, by iterating over the corpus (looping infinitely), yielding documents at random. If `sample_rate` is given, it should be a float between 0 and 1, specifying the rough proportion of documents to sample. A lower value spreads out the documents more on average. Optionally, the samples are shuffled within a limited scope. Set `shuffle` to the size of this scope (higher will shuffle more, but need to buffer more samples in memory). Otherwise (`shuffle=0`), they will appear in the order they were in the original corpus. If `start` is given, that number of documents will be skipped before drawing any samples. Set `start=0` to start at the beginning of the corpus. By default (`start=None`) a random point in the corpus will be skipped to before beginning. """ if start is None: # Choose a random point in the dataset to start at start = random.randint(0, len(corpus)-1) # Start by reading the corpus from the start point onwards, then cycle forever doc_iter = itertools.chain( # Jump into the corpus the first time round corpus.archive_iter(skip=start, subsample=sample_rate), # Then loop over the corpus infinitely infinite_cycle(corpus.archive_iter(subsample=sample_rate)) ) if shuffle is not None: # Shuffle the data points (a bit) as we go doc_iter = iter(limited_shuffle(doc_iter, shuffle)) return doc_iter
[docs]def sequential_sample(iterable, start=0, shuffle=None, sample_rate=None): """ Draw infinite samples from an iterable, by iterating over it (looping infinitely), yielding items at random. If `sample_rate` is given, it should be a float between 0 and 1, specifying the rough proportion of documents to sample. A lower value spreads out the documents more on average. Optionally, the samples are shuffled within a limited scope. Set `shuffle` to the size of this scope (higher will shuffle more, but need to buffer more samples in memory). Otherwise (`shuffle=0`), they will appear in the order they were in the original corpus. If `start` is given, that number of documents will be skipped before drawing any samples. Set `start=0` to start at the beginning of the corpus. Note that setting this to a high number can result in a slow start-up, if iterating over the items is slow. .. note:: If you're sampling documents from a `TarredCorpus`, it's better to use :func:`sequential_document_sample`, since it makes use of `TarredCorpus`'s built-in features to do the skipping and sampling more efficiently. """ # Cycle forever doc_iter = infinite_cycle(iterable) if sample_rate is not None and sample_rate != 1.: # Subsample to space out the items that get included doc_iter = subsample(doc_iter, sample_rate) if start > 0: # Skip some items at the start doc_iter = islice(doc_iter, start, None) if shuffle is not None: # Shuffle the data points (a bit) as we go doc_iter = iter(limited_shuffle(doc_iter, shuffle)) return doc_iter
[docs]def subsample(iterable, sample_rate): """ Subsample the given iterable at a given rate, between 0 and 1. """ for item in iterable: if random.random() <= sample_rate: yield item