Source code for pimlico.datatypes.word_annotations

# This file is part of Pimlico
# Copyright (C) 2016 Mark Granroth-Wilding
# Licensed under the GNU GPL v3.0 - http://www.gnu.org/licenses/gpl-3.0.en.html

import re

from operator import itemgetter

from pimlico.datatypes.base import DatatypeLoadError, DynamicOutputDatatype, DynamicInputDatatypeRequirement
from pimlico.datatypes.documents import RawDocumentType
from pimlico.datatypes.tokenized import TokenizedCorpus
from pimlico.datatypes.tar import TarredCorpus, TarredCorpusWriter, pass_up_invalid


__all__ = [
    "WordAnnotationCorpus", "WordAnnotationCorpusWriter", "SimpleWordAnnotationCorpusWriter",
    "AddAnnotationField", "WordAnnotationCorpusWithRequiredFields",
    "AnnotationParseError", "WordAnnotationsDocumentType"
]


[docs]class WordAnnotationsDocumentType(RawDocumentType): def __init__(self, options, metadata): super(WordAnnotationsDocumentType, self).__init__(options, metadata) self._word_re = None self._word_boundary = None self._sentence_boundary_re = None @property def sentence_boundary_re(self): if self._sentence_boundary_re is None: # Load the annotation format from the metadata if "sentence_boundary" not in self.metadata: raise DatatypeLoadError( "word annotation corpus does not have a sentence boundary specified in its metadata" ) # Prepare a regex for detecting sentence boundaries boundary = self.metadata["sentence_boundary"].replace("\\n", "\n") # Interpret the boundary specifier as a regex self._sentence_boundary_re = re.compile(re.escape(boundary), re.MULTILINE) return self._sentence_boundary_re @property def word_boundary(self): if self._word_boundary is None: # Load the annotation format from the metadata if "word_boundary" not in self.metadata: raise DatatypeLoadError( "word annotation corpus does not have a word boundary specified in its metadata" ) # Allow \n to be used as (or in) the word boundary self._word_boundary = self.metadata["word_boundary"].replace("\\n", "\n") return self._word_boundary @property def word_re(self): if self._word_re is None: # Load the annotation format from the metadata if "word_format" not in self.metadata: raise DatatypeLoadError( "word annotation corpus does not have a word format specified in its metadata" ) if "nonword_chars" not in self.metadata: raise DatatypeLoadError( "word annotation corpus does not have non-word chars specified in its metadata" ) # Prepare a regex for detecting sentence boundaries fmt = self.metadata["word_format"].replace("\\n", "\n") # First escape the whole thing so that we can use characters that have a special meaning in a regex fmt = re.escape(fmt) # The word format includes field specifiers of the form {name} # Replace these to make a regex with a named group nonwords = self.metadata["nonword_chars"].replace("\\n", "\n") word_re = re.sub(r"\\{(.+?)\\}", r"(?P<\1>[^%s]*)" % re.escape(nonwords), fmt) # Require the re to match the full string: it will only be called on individual words word_re = "%s$" % word_re # Compile the resulting re self._word_re = re.compile(word_re) return self._word_re
[docs] def process_document(self, raw_data): sentences = [] while len(raw_data): # Find the next sentence boundary sb = self.sentence_boundary_re.search(raw_data) if sb is None: # No more sentence boundaries, the rest must be a single sentence sentence_text = raw_data raw_data = "" else: # Process the text up to the next boundary as a sentence sentence_text = raw_data[:sb.start()] raw_data = raw_data[sb.end():] # Split the sentence on word boundaries words = sentence_text.split(self.word_boundary) # Parse each word matches = [self.word_re.match(word) for word in words] # Check that all the words matched the word re if None in matches: raise AnnotationParseError("word did not match regex for word format: %s. Matching using: %s" % (words[matches.index(None)], self.word_re.pattern)) word_dicts = [match.groupdict() for match in matches] sentences.append(word_dicts) return sentences
[docs]class WordAnnotationCorpus(TarredCorpus): datatype_name = "word_annotations" data_point_type = WordAnnotationsDocumentType # Subclasses may provide a list of the fields included for each word # Doing so allows an extra level of type checking, since datatype users know before the dataset is available # which fields will be in it annotation_fields = None def __init__(self, base_dir, pipeline): super(WordAnnotationCorpus, self).__init__(base_dir, pipeline) self._sentence_boundary_re = None self._word_re = None self._word_boundary = None
[docs] def read_annotation_fields(self): """ Get the available annotation fields from the dataset's configuration. These are the actual fields that will be available in the dictionary produced corresponding to each word. """ # To make sure the fields are in the order in which they're specified, order by matching group number return list(map(itemgetter(0), sorted(self.data_point_type_instance.word_re.groupindex.items(), key=itemgetter(1))))
[docs] def data_ready(self): if not super(WordAnnotationCorpus, self).data_ready(): return False # We now know at least that the data dir exists # Check the required formats are specified in the metadata try: self.data_point_type_instance.sentence_boundary_re self.data_point_type_instance.word_boundary self.data_point_type_instance.word_re except DatatypeLoadError: return False else: return True
def WordAnnotationCorpusWithFields(*fields): """ Functional type (factory) to create a subclass of WordAnnotationCorpus that has an exact set of fields. Makes it slightly easier to specify output datatypes, without having to define a subclass of WordAnnotationCorpus first. """ # Make a suitable classname, to aid with debugging cls_name = "WordAnnotationCorpusWith%s" % "And".join(filter(str.isalnum, f).capitalize() for f in fields) # Define a subclass of WAC that specifies the annotation_fields attribute return type(cls_name, (WordAnnotationCorpus,), dict(annotation_fields=fields))
[docs]class WordAnnotationCorpusWriter(TarredCorpusWriter): """ Ensures that the correct metadata is provided for a word annotation corpus. Doesn't take care of the formatting of the data: that needs to be done by the writing code, or by a subclass. """ def __init__(self, sentence_boundary, word_boundary, word_format, nonword_chars, base_dir, **kwargs): super(WordAnnotationCorpusWriter, self).__init__(base_dir, **kwargs) self.metadata["sentence_boundary"] = sentence_boundary.replace("\n", "\\n") self.metadata["word_boundary"] = word_boundary.replace("\n", "\\n") self.metadata["word_format"] = word_format.replace("\n", "\\n") self.metadata["nonword_chars"] = nonword_chars.replace("\n", "\\n") self.write_metadata()
[docs]class SimpleWordAnnotationCorpusWriter(WordAnnotationCorpusWriter): """ Takes care of writing word annotations in a simple format, where each line contains a sentence, words are separated by spaces and a series of annotation fields for each word are separated by \\|s (or a given separator). This corresponds to the standard tag format for C&C. """ def __init__(self, base_dir, field_names, field_sep=u"|", **kwargs): self.field_names = field_names self.field_sep = field_sep # Prepare a word format that includes the given field names word_format = field_sep.join(u"{%s}" % field for field in field_names) super(SimpleWordAnnotationCorpusWriter, self).__init__(u"\n", u" ", word_format, u" \n%s" % field_sep, base_dir, **kwargs) @pass_up_invalid def document_to_raw_data(self, doc): """ Takes data in the form of a list of sentences, where each is a list of words, where each is a list of values for each field (in the same order in which the field names were given). Encodes it in a format that can be read by a WordAnnotationCorpus. :param data: sentence data in the form described above """ return u"\n".join(u" ".join(self.field_sep.join(word_fields) for word_fields in sentence) for sentence in doc)
[docs]class AddAnnotationField(DynamicOutputDatatype): def __init__(self, input_name, add_fields): """ Dynamic type constructor that can be used in place of a module's output type. When called (when the output type is needed), dynamically creates a new type that is a WordAnnotationCorpus with the same fields as the named input to the module, with the addition of one or more new ones. Only works if the input datatype explicitly declares the fields it makes available. :param input_name: input to the module whose fields we extend :param add_fields: field or fields to add, string names """ super(AddAnnotationField, self).__init__() # Make it easy to add just a single field, the most common case self.input_name = input_name if isinstance(add_fields, basestring): add_fields = [add_fields] self.add_fields = add_fields self.datatype_doc_info = ":class:WordAnnotationCorpus with %s" % ", ".join(add_fields)
[docs] def get_datatype(self, module_info): from pimlico.core.modules.base import ModuleInfoLoadError input_datatype = module_info.get_input_datatype(self.input_name) # Allow the special case where the input datatype is a tokenized corpus # Pretend it's an annotated corpus with no annotations, just words if issubclass(input_datatype, TokenizedCorpus): base_annotation_fields = ["word"] new_datatype_name = "word_annotations_%s" % "+".join(self.add_fields) else: if not issubclass(input_datatype, WordAnnotationCorpus): raise ModuleInfoLoadError("cannot construct a dynamic word annotation corpus type, since input we're " "extending isn't a word annotation corpus. Input '%s' is a %s" % (self.input_name, input_datatype.__name__)) if input_datatype.annotation_fields is None: raise ModuleInfoLoadError("cannot construct a word annotation corpus type by adding fields to input '%s', " "since the input type, %s, doesn't explicitly declare its annotation fields" % (self.input_name, input_datatype.__name__)) base_annotation_fields = input_datatype.annotation_fields new_datatype_name = "%s+%s" % (input_datatype.datatype_name, "+".join(self.add_fields)) for field in self.add_fields: if field in base_annotation_fields: raise ModuleInfoLoadError("trying to add a field '%s' to data that already has a field with " "that name" % field) class ExtendedWordAnnotationCorpus(WordAnnotationCorpus): datatype_name = new_datatype_name annotation_fields = base_annotation_fields + self.add_fields return ExtendedWordAnnotationCorpus
@classmethod
[docs] def get_base_datatype_class(self): return WordAnnotationCorpus
[docs]class WordAnnotationCorpusWithRequiredFields(DynamicInputDatatypeRequirement): """ Dynamic (functional) type that can be used in place of a module's input type. In typechecking, checks whether the input module is a WordAnnotationCorpus (or subtype) and whether its fields include all of those required. """ def __init__(self, required_fields): super(WordAnnotationCorpusWithRequiredFields, self).__init__() # Allow just a single field name to be given if isinstance(required_fields, basestring): required_fields = [required_fields] self.required_fields = required_fields # Make useful information available to the documentation builder for the module type self.datatype_doc_info = \ ":class:`WordAnnotationCorpus <pimlico.datatypes.word_annotations.WordAnnotationCorpus>` " \ "with %s field%s" % (" and ".join( "'%s'" % f for f in required_fields), "s" if len(required_fields) > 1 else "" )
[docs] def check_type(self, supplied_type): if not issubclass(supplied_type, WordAnnotationCorpus): return False if not all(field in supplied_type.annotation_fields for field in self.required_fields): return False return True
[docs]class AnnotationParseError(Exception): pass