Source code for pimlico.datatypes.features

# This file is part of Pimlico
# Copyright (C) 2016 Mark Granroth-Wilding
# Licensed under the GNU GPL v3.0 - http://www.gnu.org/licenses/gpl-3.0.en.html

import cPickle as pickle
import os
import struct
from decimal import Decimal
from operator import itemgetter

from pimlico.datatypes.base import IterableCorpus, DatatypeLoadError, IterableCorpusWriter
from pimlico.datatypes.documents import RawDocumentType, DataPointType
from pimlico.datatypes.tar import TarredCorpus, TarredCorpusWriter, pass_up_invalid

__all__ = [
    "KeyValueListCorpus", "KeyValueListCorpusWriter", "KeyValueListDocumentType",
    "TermFeatureListCorpus", "TermFeatureListCorpusWriter", "TermFeatureListDocumentType",
    "IndexedTermFeatureListCorpus", "IndexedTermFeatureListCorpusWriter", "IndexedTermFeatureListDataPointType",
    "FeatureListScoreCorpus", "FeatureListScoreCorpusWriter", "FeatureListScoreDocumentType",
]


[docs]class KeyValueListDocumentType(RawDocumentType): def __init__(self, options, metadata): super(KeyValueListDocumentType, self).__init__(options, metadata) self.separator = self.metadata.get("separator", " ") self.fv_separator = self.metadata.get("fv_separator", "=")
[docs] def process_document(self, doc): # Read a set of feature-value pairs from each line data_points = [] for line in doc.splitlines(): # Skip blank lines if line.strip(): # Split up the various feature assignments fvs = line.strip().split(self.separator) # Now we've split on sep, unescape any instances that were escaped fvs = [unescape_sep(self.separator, "ITEMSEP", fv) for fv in fvs] # Split each one into a feature-value pair fvs = [itemgetter(0, 2)(fv.partition(self.fv_separator)) for fv in fvs] # Unescape the fv sep within feature names and feature values fvs = [ (unescape_sep(self.fv_separator, "FVSEP", fv[0]), unescape_sep(self.fv_separator, "FVSEP", fv[1])) for fv in fvs ] data_points.append(fvs) return data_points
[docs]class KeyValueListCorpus(TarredCorpus): datatype_name = "key_value_lists" data_point_type = KeyValueListDocumentType
[docs]class KeyValueListCorpusWriter(TarredCorpusWriter): def __init__(self, base_dir, separator=" ", fv_separator="=", **kwargs): super(KeyValueListCorpusWriter, self).__init__(base_dir, **kwargs) self.fv_separator = fv_separator self.separator = separator # Put the separators in the metadata, so we know how to read the data in again self.metadata["separator"] = separator self.metadata["fv_separator"] = fv_separator self.write_metadata() @pass_up_invalid def document_to_raw_data(self, doc): # Input should be a list of data points, where each is a list of feature-value pairs # One data point per line return "\n".join([ # Fv pairs are separated by separator self.separator.join([ # Make sure they don't include the separator escape_sep( self.separator, "ITEMSEP", # Feature and value are separated by fv_separator "%s%s%s" % ( # Make sure they don't include the fv separator escape_sep(self.fv_separator, "FVSEP", feature_name), self.fv_separator, escape_sep(self.fv_separator, "FVSEP", feature_value) ) ) for (feature_name, feature_value) in data_point ]) for data_point in doc ])
[docs]class TermFeatureListDocumentType(KeyValueListDocumentType):
[docs] def process_document(self, doc): raw_data_points = super(TermFeatureListDocumentType, self).process_document(doc) data_points = [] for data_point in raw_data_points: # Pull out the special "term" feature (usually at the beginning) try: term = (value for (feature, value) in data_point if feature == "term").next() except StopIteration: # No "term" feature found -- uh-oh! Catch as invalid doc raise ValueError("data point has no 'term' feature: %s" % data_point) # The rest of the features are feature counts features = dict((feature, int(value)) for (feature, value) in data_point if feature != "term") data_points.append((term, features)) return data_points
[docs]class TermFeatureListCorpus(KeyValueListCorpus): """ Special case of KeyValueListCorpus, where one special feature "term" is always present and the other feature types are counts of the occurrence of a particular feature with this term in each data point. """ datatype_name = "term_feature_lists" data_point_type = TermFeatureListDocumentType
[docs]class TermFeatureListCorpusWriter(KeyValueListCorpusWriter): def __init__(self, base_dir, **kwargs): super(TermFeatureListCorpusWriter, self).__init__(base_dir, separator=" ", fv_separator="=", **kwargs) @pass_up_invalid def document_to_raw_data(self, doc): # Input should be a list of data points, where each is a (term, feature count) pair # and each feature count is a dictionary mapping feature names to counts data = [ [("term", term)] + [(feature, str(count)) for (feature, count) in feature_counts.items()] for (term, feature_counts) in doc ] return super(TermFeatureListCorpusWriter, self).document_to_raw_data(data)
BYTE_FORMATS = { # (num bytes, signed) (1, True): "b", # signed char (1, False): "B", # unsigned char (2, True): "h", # signed short (2, False): "H", # unsigned short (4, True): "l", # signed long (4, False): "L", # unsigned long (8, True): "q", # signed long long (8, False): "Q", # unsigned long long } def get_struct(bytes, signed): # Put together the formatting string for converting ints to bytes if (bytes, signed) not in BYTE_FORMATS: raise ValueError("invalid specification for int format: signed=%s, bytes=%s. signed must be bool, " "bytes in [1, 2, 4, 8]" % (signed, bytes)) format_string = "<" + BYTE_FORMATS[(bytes, signed)] # Compile the format for faster encoding return struct.Struct(format_string)
[docs]class IndexedTermFeatureListDataPointType(DataPointType): pass
[docs]class IndexedTermFeatureListCorpus(IterableCorpus): """ Term-feature instances, indexed by a dictionary, so that all that's stored is the indices of the terms and features and the feature counts for each instance. This is iterable, but, unlike TermFeatureListCorpus, doesn't iterate over documents. Now that we've filtered extracted features down to a smaller vocab, we put everything in one big file, with one data point per line. Since we're now storing indices, we can use a compact format that's fast to read from disk, making iterating over the dataset faster than if we had to read strings, look them up in the vocab, etc. By default, the ints are stored as C longs, which use 4 bytes. If you know you don't need ints this big, you can choose 1 or 2 bytes, or even 8 (long long). By default, the ints are unsigned, but they may be signed. """ data_point_type = IndexedTermFeatureListDataPointType def __init__(self, *args, **kwargs): super(IndexedTermFeatureListCorpus, self).__init__(*args, **kwargs) self._term_dictionary = None self._feature_dictionary = None @property def term_dictionary(self): if self._term_dictionary is None: # Read in the dictionary from a file with open(os.path.join(self.data_dir, "term_dictionary"), "r") as f: self._term_dictionary = pickle.load(f) return self._term_dictionary @property def feature_dictionary(self): if self._feature_dictionary is None: # Read in the dictionary from a file with open(os.path.join(self.data_dir, "feature_dictionary"), "r") as f: self._feature_dictionary = pickle.load(f) return self._feature_dictionary def __iter__(self): # Check how many bytes to use per int bytes = self.metadata.get("bytes", 4) signed = self.metadata.get("signed", False) # Prepare a struct for reading one int at a time int_unpacker = get_struct(bytes, signed) # Prepare another struct for reading the number of feature counts in a data point length_unpacker = get_struct(2, False) with open(os.path.join(self.data_dir, "data"), "r") as data_file: try: while True: # Read data for a single int - the term term = _read_binary_int(data_file, int_unpacker) # Next digit tells us the number of feature counts in this data point num_features = _read_binary_int(data_file, length_unpacker) # Now read two ints for every feature: the feature id and the count feature_counts = dict( (_read_binary_int(data_file, int_unpacker), _read_binary_int(data_file, int_unpacker)) for i in range(num_features) ) yield term, feature_counts except StopIteration: # Reached end of file pass
[docs]class IndexedTermFeatureListCorpusWriter(IterableCorpusWriter): """ index_input=True means that the input terms and feature names are already mapped to dictionary indices, so are assumed to be ints. Otherwise, inputs will be looked up in the appropriate dictionary to get an index. """ def __init__(self, base_dir, term_dictionary, feature_dictionary, bytes=4, signed=False, index_input=False, **kwargs): super(IndexedTermFeatureListCorpusWriter, self).__init__(base_dir, **kwargs) self.metadata["bytes"] = bytes self.metadata["signed"] = signed self.write_metadata() self.feature_dictionary = feature_dictionary self.term_dictionary = term_dictionary # Write the dictionaries out to disk straight away self.write_dictionaries() self.int_packer = get_struct(self.metadata["bytes"], self.metadata["signed"]) self.length_packer = get_struct(2, False) self.index_input = index_input self._added_data_points = 0
[docs] def write_dictionaries(self): with open(os.path.join(self.data_dir, "term_dictionary"), "w") as f: pickle.dump(self.term_dictionary, f, -1) with open(os.path.join(self.data_dir, "feature_dictionary"), "w") as f: pickle.dump(self.feature_dictionary, f, -1)
def __enter__(self): self.data_file = open(os.path.join(self.data_dir, "data"), "w") return self def __exit__(self, exc_type, exc_val, exc_tb): self.data_file.close() # Set the length to be the number of data points written self.metadata["length"] = self._added_data_points super(IndexedTermFeatureListCorpusWriter, self).__exit__(exc_type, exc_val, exc_tb)
[docs] def add_data_points(self, iterable): for term, feature_counts in iterable: feature_counts = feature_counts.items() if not self.index_input: # Look up indices in the dictionaries if term not in self.term_dictionary.token2id: # Skip unknown terms continue term = self.term_dictionary.token2id[term] # Look up all feature name indices, skipping unknown features feature_counts = [(self.feature_dictionary.token2id[f], cnt) for (f, cnt) in feature_counts if f in self.feature_dictionary.token2id] # Filter out zero counts feature_counts = [(f, c) for (f, c) in feature_counts if c > 0] if len(feature_counts) == 0: # If there aren't any features left after filtering on dictionary and counts, skip whole data point continue # Now we've converted to indices, write the data out # First write the term index _write_binary_int(self.data_file, self.int_packer, term) # Next the number of feature counts so we know how much to read when reading in _write_binary_int(self.data_file, self.length_packer, len(feature_counts)) # Now all the features and their counts for f, c in feature_counts: _write_binary_int(self.data_file, self.int_packer, f) _write_binary_int(self.data_file, self.int_packer, c) # Keep a record of how much data we wrote so we can make the corpus length available to future modules self._added_data_points += 1
[docs]class FeatureListScoreDocumentType(RawDocumentType): """ Document type that stores a list of features, each associated with a floating-point score. The feature lists are simply lists of indices to a feature set for the whole corpus that includes all feature types and which is stored along with the dataset. These may be binary features (present or absent for each data point), or may have a weight associated with them. If they are binary, the returned data will have a weight of 1 associated with each. A corpus of this type can be used to train, for example, a regression. If scores and weights are passed in as Decimal objects, they will be stored as strings. If they are floats, they will be converted to Decimals via their string representation (avoiding some of the oddness of converting between binary and decimal representations). To avoid loss of precision, pass in all scores and weights as Decimal objects. """ formatters = [("features", "pimlico.datatypes.formatters.features.FeatureListScoreFormatter")] def __init__(self, options, metadata): super(FeatureListScoreDocumentType, self).__init__(options, metadata) self.features = self.metadata.get("features", []) self.separator = self.metadata.get("separator", ":")
[docs] def process_document(self, doc): # Read a set of feature-value pairs from each line data_points = [] for line in doc.splitlines(): # Skip blank lines if line.strip(): # The score is at the start of the line, before the list of features score, __, feature_str = line.partition(self.separator) # The score should always be a Decimal score = Decimal(score) weighted_features = [] for feature in feature_str.strip().split(","): if "=" in feature: # This feature has a weight/count feature, __, weight = feature.partition("=") weight = Decimal(weight) else: # No weight given: default to 1 weight = Decimal(1) # Features should be represented as ints: indices in feature list weighted_features.append((int(feature), weight)) data_points.append((score, weighted_features)) return data_points
[docs]class FeatureListScoreCorpus(TarredCorpus): datatype_name = "scored_weight_feature_lists" data_point_type = FeatureListScoreDocumentType
[docs]class FeatureListScoreCorpusWriter(TarredCorpusWriter): """ Input should be a list of data points. Each is a (score, feature list) pair, where score is a Decimal, or other numeric type. Feature list is a list of (feature name, weight) pairs, or just feature names. If weights are not given, they will default to 1 when read in (but no weight is stored). If index_input=True, it is assumed that feature IDs will be given instead of feature names. Otherwise, the feature names will be looked up in the feature list. Any features not found in the feature type list will simply be skipped. """ def __init__(self, base_dir, features, separator=":", index_input=False, **kwargs): super(FeatureListScoreCorpusWriter, self).__init__(base_dir, **kwargs) self.index_input = index_input self.separator = separator self.features = features # Put the separators in the metadata, so we know how to read the data in again self.metadata["separator"] = separator self.metadata["features"] = features self.write_metadata() def _num_to_dec(self, num): if isinstance(num, Decimal): return num elif isinstance(num, float): # Convert floats to strings first, which deals with some binary weirdness, then to Decimals return Decimal(str(num)) else: # Try using Decimal's own init to convert to a Decimal return Decimal(num) @pass_up_invalid def document_to_raw_data(self, doc): if self.index_input: # Expect indices to be given, so don't look up feature names _feature_to_id = lambda x: x _valid_feature = lambda x: True else: feature_dict = dict((feature, id) for (id, feature) in enumerate(self.features)) # Shouldn't got a feature that isn't in the list of feature types list, as we're skipping them _feature_to_id = lambda x: feature_dict[x] _valid_feature = lambda x: x[0] in feature_dict if type(x) is tuple else x in feature_dict # One data point per line return "\n".join( "%s: %s" % ( self._num_to_dec(score), ",".join( "%d=%s" % (_feature_to_id(feature[0]), self._num_to_dec(feature[1])) if type(feature) is tuple else "%d" % _feature_to_id(feature) for feature in features if _valid_feature(feature) ) ) for (score, features) in doc )
def _read_binary_int(f, unpacker): # Read the right number of bytes from the file string = f.read(unpacker.size) if string == "": raise StopIteration try: # Try decoding this as a single int integer = unpacker.unpack(string)[0] except struct.error, e: raise DatatypeLoadError("could not unpack integer data in file: corrupt data? %s" % e) return integer def _write_binary_int(f, packer, x): string = packer.pack(x) f.write(string) def escape_sep(sep, sep_type, text): return text.replace(sep, "~~%s~~" % sep_type) def unescape_sep(sep, sep_type, text): return text.replace("~~%s~~" % sep_type, sep)