# This file is part of Pimlico
# Copyright (C) 2020 Mark Granroth-Wilding
# Licensed under the GNU LGPL v3.0 - https://www.gnu.org/licenses/lgpl-3.0.en.html
from __future__ import absolute_import
from __future__ import print_function
from future import standard_library
from pimlico.utils.pimarc import PimarcReader, PimarcWriter
from pimlico.utils.pimarc.reader import StartAfterFilenameNotFound
from pimlico.utils.pimarc.tar import PimarcTarBackend
standard_library.install_aliases()
from builtins import zip
from builtins import next
from builtins import object
from builtins import bytes
import gzip
import os
import zlib
from io import StringIO, BytesIO
from pimlico.datatypes.base import DynamicOutputDatatype
from pimlico.datatypes.corpora import IterableCorpus, DataPointType
from pimlico.datatypes.corpora.data_points import is_invalid_doc
__all__ = [
"GroupedCorpus", "AlignedGroupedCorpora",
"CorpusAlignmentError", "GroupedCorpusIterationError",
"GroupedCorpusWithTypeFromInput", "CorpusWithTypeFromInput"
]
[docs]class GroupedCorpus(IterableCorpus):
datatype_name = "grouped_corpus"
# This may be overridden by subclasses to provide filters for documents applied before main doc processing
document_preprocessors = []
[docs] class Reader(object):
[docs] class Setup(object):
[docs] def data_ready(self, base_dir):
# Run the superclass check -- that the data dir exists
if not super(GroupedCorpus.Reader.Setup, self).data_ready(base_dir):
return False
# Also check that we've got at least one archive in the data dir, unless the length of the corpus is 0
metadata = self.read_metadata(base_dir)
if metadata["length"] > 0 and not self._has_archives(self._get_data_dir(base_dir)):
return False
# Check whether the corpus is marked as being currently written to
if "writing" in metadata and metadata["writing"]:
return False
return True
@classmethod
def _get_archive_filenames(cls, data_dir):
return list(cls._iter_archive_filenames(data_dir))
@classmethod
def _iter_archive_filenames(cls, data_dir):
if data_dir is None:
return
else:
# Check for any .prc files: if even one is found, we look only at .prc files
ext = ".prc" if cls._uses_prc(data_dir) else ".tar"
for root, dirs, files in os.walk(data_dir):
for filename in files:
if filename.endswith(ext):
yield os.path.join(root, filename)
@classmethod
def _uses_prc(cls, data_dir):
found_tar = False
for root, dirs, files in os.walk(data_dir):
for filename in files:
if filename.endswith(".prc"):
# Found one prc file, so use prc
return True
elif filename.endswith(".tar"):
# Found one tar file: if no prc files found, we're clearly using tar
found_tar = True
# No archives found:
# If tars found, use them
# Otherwise, assume we're using prc but don't have any files yet
return not found_tar
def _has_archives(self, data_dir):
# Return True if there's at least 1 archive in the dir
try:
next(self._iter_archive_filenames(data_dir))
except StopIteration:
return False
else:
return True
def __init__(self, *args, **kwargs):
super(GroupedCorpus.Reader, self).__init__(*args, **kwargs)
# Read in the archive filenames, which are stored as tar files
self.archive_filenames = self.setup._get_archive_filenames(self.data_dir)
self.archive_filenames.sort()
self.archives = [os.path.splitext(os.path.basename(f))[0] for f in self.archive_filenames]
self.archive_to_archive_filename = dict(zip(self.archives, self.archive_filenames))
# Whether this corpus uses Pimarc (prc) files or tar
self.uses_tar = not self.setup._uses_prc(self.data_dir)
# Cache the last-used archive
self._last_used_archive = None
self._last_used_archive_name = None
[docs] def get_archive(self, archive_name):
"""
Return a `PimarcReader` for the named archive, or, if using the tar backend, a
PimarcTarBackend.
"""
if self._last_used_archive_name is None or self._last_used_archive_name != archive_name or \
self._last_used_archive.closed:
archive_filename = self.archive_to_archive_filename[archive_name]
archive_path = os.path.join(self.data_dir, archive_filename)
if archive_filename.endswith(".tar"):
# Use the tar backend for backwards compatibility
arc = PimarcTarBackend(archive_path)
else:
arc = PimarcReader(archive_path)
# Close the cached archive
if self._last_used_archive is not None:
self._last_used_archive.close()
# Replace it with the new one
self._last_used_archive_name = archive_name
self._last_used_archive = arc
# Used the cached archive
return self._last_used_archive
def __iter__(self):
return self.doc_iter()
[docs] def doc_iter(self, start_after=None, skip=None, name_filter=None):
for __, doc_name, doc in self.archive_iter(start_after=start_after, skip=skip, name_filter=name_filter):
yield doc_name, doc
[docs] def archive_iter(self, start_after=None, skip=None, name_filter=None):
"""
Iterate over corpus archive by archive, yielding for each document the archive name,
the document name and the document itself.
:param name_filter: if given, should be a callable that takes two args, an archive name and
document name, and returns True if the document should be yielded and False if it should be skipped.
This can be preferable to filtering the yielded documents, as it skips all document pre-processing
for skipped documents, so speeds up things like random subsampling of a corpus, where the
document content never needs to be read in skipped cases
:param start_after: skip over the first portion of the corpus, until the given document
is reached. Should be specified as a pair (archive name, doc name)
:param skip: skips over the first portion of the corpus, until this number of documents have
been seen
"""
gzipped = self.metadata.get("gzip", False)
if skip is not None and skip < 1:
skip = None
# -1 means don't skip anything, otherwise we accumulate how many we've skipped
skipped = -1 if skip is None else 0
# Start after we've hit this (archive, doc name)
started = start_after is None
start_after_req = start_after
for archive_name in self.archives:
if not started and start_after is not None:
if start_after[0] == archive_name and start_after[1] is None:
# Asked to start after an archive, but not given specific filename
# Skip the whole archive and start at the beginning of the next one
# Cancel the start_after requirement, but don't start yet, as we might still skip
start_after = None
continue
elif start_after[0] != archive_name:
# If we're waiting for a particular archive/file, skip archives until we're in the right one
continue
# Now we're either reading the whole of this archive, or starting after a filename in it
with self.get_archive(archive_name) as archive:
skip_in_archive = None
start_after_in_archive = None
# Allow the first portion of the corpus to be skipped
if start_after is not None:
# If we've got this far, we're in the right archive, but need to skip past the filename
start_after_in_archive = start_after[1]
start_after = None
started = True
elif skipped != -1:
# If we're skipping a certain number of docs, check how long the archive is to know
# whether to skip it all
# This is slow for tar, but relatively fast for Pimarc
archive_length = len(archive)
if skipped + archive_length <= skip:
# We can skip the whole of this archive
skipped += archive_length
continue
else:
# Skip over the remaining number within this archive
skip_in_archive = skip - skipped
# Don't skip at all in future archives
skipped = -1
try:
# Iterate over the files in the archive
for metadata, raw_data in archive.iter_files(
skip=skip_in_archive, start_after=start_after_in_archive):
filename = metadata["name"]
# By default, doc name is just the same as filename
doc_name = filename
if gzipped and doc_name.endswith(".gz"):
# If we used the .gz extension while writing the file, remove it to get the doc name
doc_name = doc_name[:-3]
# If subsampling or filtering, decide whether to extract this file
if name_filter is not None and not name_filter(archive_name, doc_name):
# Reject this file
continue
if gzipped:
if doc_name.endswith(".gz"):
# Gzipped document
with gzip.GzipFile(fileobj=BytesIO(raw_data), mode="rb") as gzip_file:
raw_data = gzip_file.read()
else:
# For backwards-compatibility, where gzip=True, but the gz extension wasn't used, we
# just decompress with zlib, without trying to parse the gzip headers
raw_data = zlib.decompress(raw_data)
# Apply subclass-specific post-processing and produce a document instance
document = self.data_to_document(raw_data)
yield archive_name, doc_name, document
except StartAfterFilenameNotFound:
# Catch the case where the archive/filename requested as a starting point wasn't found
# With Pimarc, the error is raised immediately
# With tar, it's only raised when we get to the end of the file without finding the filename
raise GroupedCorpusIterationError(
"tried to start iteration over grouped corpus at document (%s, %s), but filename %s "
"wasn't found in archive %s" %
(start_after_req[0], start_after_req[1], start_after_req[1], archive_name)
)
[docs] def list_archive_iter(self):
gzipped = self.metadata.get("gzip", False)
for archive_name in self.archives:
with self.get_archive(archive_name) as archive:
for filename in archive.iter_filenames():
# Do the same name preprocessing that archive_iter does
doc_name = filename
if gzipped and doc_name.endswith(".gz"):
# If we used the .gz extension while writing the file, remove it to get the doc name
doc_name = doc_name[:-3]
yield archive_name, doc_name
[docs] def list_iter(self):
"""
Iterate over the list of document names, without processing the doc contents.
In some cases, this could be considerably faster than iterating over all the docs.
"""
for archive_name, doc_name in self.list_archive_iter():
yield doc_name
[docs] class Writer(object):
"""
Writes a large corpus of documents out to disk, grouping them together in Pimarc archives.
A subtlety is that, as soon as the writer has been initialized,
it must be legitimate to initialize a datatype to read the corpus. Naturally, at this point there will
be no documents in the corpus, but it allows us to do document processing on the fly by initializing
writers and readers to be sure the pre/post-processing is identical to if we were writing the docs to disk
and reading them in again.
The reader above allows reading from tar archives for backwards compatibility. However,
it is no longer possible to write corpora to tar archives. This has been completely replaced
by the new Pimarc archives, which are more efficient to use and allow random access when
necessary without huge speed penalties.
"""
metadata_defaults = {
"gzip": (
False,
"Gzip each document before adding it to the archive. Not the same as creating a tarball, "
"since the docs are gzipped *before* adding them, not the whole archive together, but means "
"we can easily iterate over the documents, unzipping them as required"
),
}
writer_param_defaults = {
"append": (
False,
"If True, existing archives and their files are not overwritten, the new files are "
"just added to the end. This is useful where we want to restart processing that was "
"broken off in the middle"
),
}
def __init__(self, *args, **kwargs):
super(GroupedCorpus.Writer, self).__init__(*args, **kwargs)
# Set "gzip" in the metadata, so we know to unzip when reading
self.gzip = self.metadata["gzip"]
self.append = self.params["append"]
self.current_archive_name = None
self.current_archive = None
self.metadata["length"] = 0
if self.append:
# Shouldn't rely on the metadata: count up docs in archive to get initial length
# This can take a long time on a large corpus
self.metadata["length"] = self._count_written_docs()
else:
# If we're not appending, we must first ensure any existing archives are deleted
self.delete_all_archives()
self.doc_count = self.metadata["length"]
# Set a value in the metadata to indicate that we're still writing this corpus
self.metadata["writing"] = True
[docs] def add_document(self, archive_name, doc_name, doc, metadata=None):
"""
Add a document to the named archive. All docs should be added to a single archive
before moving onto the next. If the archive name is the same as the previous
doc added, the doc's data will be appended. Otherwise, the archive is finalized
and we move onto the new archive.
:param metadata: dict of metadata values to write with the document. If doc is a document
instance, the metadata is taken from there first, but these values will override anything
in the doc object's metadata. If doc is a bytes object, the metadata kwarg is used
:param archive_name: archive name
:param doc_name: name of document
:param doc: document instance or bytes object containing document's raw data
"""
# A document instance provides access to the raw data for a document as a bytes (Py3) or string (Py2)
# If it's not directly available, it will be converted when we try to retrieve the raw data
try:
data = doc.raw_data
except AttributeError:
# Instead of type-checking every document, we assume that if it has a raw_data attr, this
# is the right thing to use
# If it's a dict, we can instantiate a Document object to convert to raw bytes
if type(doc) is dict:
data = self.datatype.data_point_type(**doc).raw_data
# If a bytes object is given, we assume that's the doc's raw data
elif type(doc) is bytes:
data = doc
elif not isinstance(doc, DataPointType.Document):
# If not, we kick up a fuss, as we've presumably been given something that's not a valid document
raise TypeError("documents added to a grouped corpus should be instances of the data point type's "
"document class. Data point type is {}. Got {}".format(
self.datatype.data_point_type.name, type(doc).__name__
))
else:
# Some other problem caused this
raise
else:
# If we got the raw data from a document object, we should also retrieve the metadata from there
# This can be None
doc_metadata = doc.metadata or {}
# If metadata is also given as a kwarg, override with those values
if metadata is not None:
doc_metadata.update(metadata)
metadata = doc_metadata
if data is None:
# For an empty result, signified by None, output an empty file
data = bytes()
try:
# This should already be a bytes object, but we do this here
# to make it more likely that old code works, while the above message is showing and
data = bytes(data)
except Exception as e:
# It's here now to ease the transition to Python 3, as this is going to be a tricky point to get right
# Check that the data is a unicode string, as expected
raise TypeError("tried to add a document of type {}, but its raw_data was of type {}, not "
"something compatible with bytes(). "
"This is probably a result of the Python 2-3 conversion. (Error: {})".format(
self.datatype.data_point_type.name, type(data).__name__, e
))
if archive_name != self.current_archive_name:
# Starting a new archive
if self.current_archive is not None:
# Close the old one
self.current_archive.close()
self.current_archive_name = archive_name
arc_filename = os.path.join(self.data_dir, "{}.prc".format(archive_name))
# If we're appending a corpus and the archive already exists, append to it
self.current_archive = PimarcWriter(arc_filename,
mode="a" if self.append and os.path.exists(arc_filename) else "w")
# Add a new document to archive
if self.gzip:
# We used to just use zlib to compress, which works fine, but it's not easy to open the files manually
# Using gzip (i.e. writing gzip headers) makes it easier to use the data outside Pimlico
gzip_io = StringIO()
with gzip.GzipFile(mode="wb", compresslevel=9, fileobj=gzip_io) as gzip_file:
gzip_file.write(data)
data = gzip_io.getvalue()
filename = "{}.gz".format(doc_name)
else:
filename = doc_name
# Append this document's data to the Pimarc
self.current_archive.write_file(data, name=filename, metadata=metadata)
# We used to flush after every write, but it's very slow
# See note in flush() docstring
#self.flush()
# Keep a count of how many we've added so we can write metadata
self.doc_count += 1
[docs] def flush(self):
"""
Flush disk write of the archive currently being written.
This used to be called after adding each new file, but slows down the writing
massively. Not doing this brings a risk that the written archives are very out
of date if a process gets forcibly stopped. However, document map processes are
better now than they used to be at recovering from this situation when restarting,
so I'm removing this flushing to speed things up.
"""
if self.current_archive is not None:
self.current_archive.flush()
def __exit__(self, exc_type, exc_val, exc_tb):
if self.current_archive is not None:
self.current_archive.close()
self.metadata["length"] = self.doc_count
del self.metadata["writing"]
super(GroupedCorpus.Writer, self).__exit__(exc_type, exc_val, exc_tb)
def _count_written_docs(self):
"""
Emulates what a reader does to iterate over docs, in order to count up how many
docs have already been written. Used when appending an existing corpus and not
trusting the stored length in the metadata.
"""
# Look for already written archives
archive_filenames = GroupedCorpus.Reader.Setup._get_archive_filenames(self.data_dir)
archive_filenames.sort()
total_docs = 0
for archive_filename in archive_filenames:
# Count the docs in each archive
with PimarcReader(archive_filename) as arc:
total_docs += len(arc)
return total_docs
[docs] def delete_all_archives(self):
"""
Check for any already written archives and delete them all to make a fresh
start at writing this corpus.
"""
# Look for already written archives
archive_filenames = GroupedCorpus.Reader.Setup._get_archive_filenames(self.data_dir)
# Delete all files for each one
for archive_filename in archive_filenames:
PimarcWriter.delete(archive_filename)
def exclude_invalid(doc_iter):
"""
Generator that skips any invalid docs when iterating over a document dataset.
"""
return ((doc_name, doc) for (doc_name, doc) in doc_iter if not is_invalid_doc(doc))
[docs]class AlignedGroupedCorpora(object):
"""
Iterator for iterating over multiple corpora simultaneously that contain the same files, grouped into
archives in the same way. This is the standard utility for taking multiple inputs to a Pimlico module
that contain different data but for the same corpus (e.g. output of different tools).
"""
def __init__(self, readers):
if len(readers) == 0:
raise CorpusAlignmentError("corpus aligner must have at least one corpus, got zero")
self.readers = readers
self.archives = self.readers[0].archives
# Check that the corpora have the same archives in them
if not all(c.archives == self.archives for c in self.readers):
raise CorpusAlignmentError("not all corpora have the same archives in them, cannot align")
def __iter__(self):
for archive, filename, docs in self.archive_iter():
yield filename, docs
[docs] def archive_iter(self, start_after=None, skip=None, name_filter=None):
# Iterate over all grouped corpora at once
for corpus_items in zip(
*[corpus.archive_iter(start_after=start_after, skip=skip)
for corpus in self.readers]):
# Check we've got the same archive and doc name combination from every corpus
if not all(corpus_item[0] == corpus_items[0][0] for corpus_item in corpus_items[1:]) or \
not all(corpus_item[1] == corpus_items[0][1] for corpus_item in corpus_items[1:]):
raise CorpusAlignmentError(
"filenames within archives in grouped corpora do not correspond: %s" %
", ".join(["(%s/%s)" % (corpus_item[0], corpus_item[1]) for corpus_item in corpus_items])
)
# If filtering, apply the filter only once for all corpora
if name_filter is not None and not name_filter(corpus_items[0][0], corpus_items[0][1]):
continue
yield corpus_items[0][0], corpus_items[0][1], [corpus_item[2] for corpus_item in corpus_items]
def __len__(self):
return len(self.readers[0])
[docs]class CorpusAlignmentError(Exception):
pass
[docs]class GroupedCorpusIterationError(Exception):
pass