# This file is part of Pimlico
# Copyright (C) 2016 Mark Granroth-Wilding
# Licensed under the GNU GPL v3.0 - http://www.gnu.org/licenses/gpl-3.0.en.html
import cPickle as pickle
import os
import struct
from operator import itemgetter
from pimlico.datatypes.base import IterableCorpus, DatatypeLoadError, IterableCorpusWriter
from pimlico.datatypes.documents import RawDocumentType, DataPointType
from pimlico.datatypes.tar import TarredCorpus, TarredCorpusWriter, pass_up_invalid
__all__ = [
"KeyValueListCorpus", "KeyValueListCorpusWriter",
"TermFeatureListCorpus", "TermFeatureListCorpusWriter",
"IndexedTermFeatureListCorpus", "IndexedTermFeatureListCorpusWriter"
]
class KeyValueListDocumentType(RawDocumentType):
def __init__(self, options, metadata):
super(KeyValueListDocumentType, self).__init__(options, metadata)
self.separator = self.metadata.get("separator", " ")
self.fv_separator = self.metadata.get("fv_separator", "=")
def process_document(self, doc):
# Read a set of feature-value pairs from each line
data_points = []
for line in doc.splitlines():
# Skip blank lines
if line.strip():
# Split up the various feature assignments
fvs = line.strip().split(self.separator)
# Now we've split on sep, unescape any instances that were escaped
fvs = [unescape_sep(self.separator, "ITEMSEP", fv) for fv in fvs]
# Split each one into a feature-value pair
fvs = [itemgetter(0, 2)(fv.partition(self.fv_separator)) for fv in fvs]
# Unescape the fv sep within feature names and feature values
fvs = [
(unescape_sep(self.fv_separator, "FVSEP", fv[0]), unescape_sep(self.fv_separator, "FVSEP", fv[1]))
for fv in fvs
]
data_points.append(fvs)
return data_points
[docs]class KeyValueListCorpus(TarredCorpus):
datatype_name = "key_value_lists"
data_point_type = KeyValueListDocumentType
[docs]class KeyValueListCorpusWriter(TarredCorpusWriter):
def __init__(self, base_dir, separator=" ", fv_separator="=", **kwargs):
super(KeyValueListCorpusWriter, self).__init__(base_dir, **kwargs)
self.fv_separator = fv_separator
self.separator = separator
# Put the separators in the metadata, so we know how to read the data in again
self.metadata["separator"] = separator
self.metadata["fv_separator"] = fv_separator
self.write_metadata()
@pass_up_invalid
def document_to_raw_data(self, doc):
# Input should be a list of data points, where each is a list of feature-value pairs
# One data point per line
return "\n".join([
# Fv pairs are separated by separator
self.separator.join([
# Make sure they don't include the separator
escape_sep(
self.separator, "ITEMSEP",
# Feature and value are separated by fv_separator
"%s%s%s" % (
# Make sure they don't include the fv separator
escape_sep(self.fv_separator, "FVSEP", feature_name),
self.fv_separator,
escape_sep(self.fv_separator, "FVSEP", feature_value)
)
) for (feature_name, feature_value) in data_point
]) for data_point in doc
])
class TermFeatureListDocumentType(KeyValueListDocumentType):
def process_document(self, doc):
raw_data_points = super(TermFeatureListDocumentType, self).process_document(doc)
data_points = []
for data_point in raw_data_points:
# Pull out the special "term" feature (usually at the beginning)
try:
term = (value for (feature, value) in data_point if feature == "term").next()
except StopIteration:
# No "term" feature found -- uh-oh! Catch as invalid doc
raise ValueError("data point has no 'term' feature: %s" % data_point)
# The rest of the features are feature counts
features = dict((feature, int(value)) for (feature, value) in data_point if feature != "term")
data_points.append((term, features))
return data_points
[docs]class TermFeatureListCorpus(KeyValueListCorpus):
"""
Special case of KeyValueListCorpus, where one special feature "term" is always present and the other
feature types are counts of the occurrence of a particular feature with this term in each data point.
"""
datatype_name = "term_feature_lists"
data_point_type = TermFeatureListDocumentType
[docs]class TermFeatureListCorpusWriter(KeyValueListCorpusWriter):
def __init__(self, base_dir, **kwargs):
super(TermFeatureListCorpusWriter, self).__init__(base_dir, separator=" ", fv_separator="=", **kwargs)
@pass_up_invalid
def document_to_raw_data(self, doc):
# Input should be a list of data points, where each is a (term, feature count) pair
# and each feature count is a dictionary mapping feature names to counts
data = [
[("term", term)] + [(feature, str(count)) for (feature, count) in feature_counts.items()]
for (term, feature_counts) in doc
]
return super(TermFeatureListCorpusWriter, self).document_to_raw_data(data)
BYTE_FORMATS = {
# (num bytes, signed)
(1, True): "b", # signed char
(1, False): "B", # unsigned char
(2, True): "h", # signed short
(2, False): "H", # unsigned short
(4, True): "l", # signed long
(4, False): "L", # unsigned long
(8, True): "q", # signed long long
(8, False): "Q", # unsigned long long
}
def get_struct(bytes, signed):
# Put together the formatting string for converting ints to bytes
if (bytes, signed) not in BYTE_FORMATS:
raise ValueError("invalid specification for int format: signed=%s, bytes=%s. signed must be bool, "
"bytes in [1, 2, 4, 8]" % (signed, bytes))
format_string = "<" + BYTE_FORMATS[(bytes, signed)]
# Compile the format for faster encoding
return struct.Struct(format_string)
class IndexedTermFeatureListDataPointType(DataPointType):
pass
[docs]class IndexedTermFeatureListCorpus(IterableCorpus):
"""
Term-feature instances, indexed by a dictionary, so that all that's stored is the indices of the terms
and features and the feature counts for each instance. This is iterable, but, unlike TermFeatureListCorpus,
doesn't iterate over documents. Now that we've filtered extracted features down to a smaller vocab, we
put everything in one big file, with one data point per line.
Since we're now storing indices, we can use a compact format that's fast to read from disk, making iterating
over the dataset faster than if we had to read strings, look them up in the vocab, etc.
By default, the ints are stored as C longs, which use 4 bytes. If you know you don't need ints this
big, you can choose 1 or 2 bytes, or even 8 (long long). By default, the ints are unsigned, but they
may be signed.
"""
data_point_type = IndexedTermFeatureListDataPointType
def __init__(self, *args, **kwargs):
super(IndexedTermFeatureListCorpus, self).__init__(*args, **kwargs)
self._term_dictionary = None
self._feature_dictionary = None
@property
def term_dictionary(self):
if self._term_dictionary is None:
# Read in the dictionary from a file
with open(os.path.join(self.data_dir, "term_dictionary"), "r") as f:
self._term_dictionary = pickle.load(f)
return self._term_dictionary
@property
def feature_dictionary(self):
if self._feature_dictionary is None:
# Read in the dictionary from a file
with open(os.path.join(self.data_dir, "feature_dictionary"), "r") as f:
self._feature_dictionary = pickle.load(f)
return self._feature_dictionary
def __iter__(self):
# Check how many bytes to use per int
bytes = self.metadata.get("bytes", 4)
signed = self.metadata.get("signed", False)
# Prepare a struct for reading one int at a time
int_unpacker = get_struct(bytes, signed)
# Prepare another struct for reading the number of feature counts in a data point
length_unpacker = get_struct(2, False)
with open(os.path.join(self.data_dir, "data"), "r") as data_file:
try:
while True:
# Read data for a single int - the term
term = _read_binary_int(data_file, int_unpacker)
# Next digit tells us the number of feature counts in this data point
num_features = _read_binary_int(data_file, length_unpacker)
# Now read two ints for every feature: the feature id and the count
feature_counts = dict(
(_read_binary_int(data_file, int_unpacker), _read_binary_int(data_file, int_unpacker))
for i in range(num_features)
)
yield term, feature_counts
except StopIteration:
# Reached end of file
pass
[docs]class IndexedTermFeatureListCorpusWriter(IterableCorpusWriter):
"""
index_input=True means that the input terms and feature names are already mapped to dictionary indices, so
are assumed to be ints. Otherwise, inputs will be looked up in the appropriate dictionary to get an index.
"""
def __init__(self, base_dir, term_dictionary, feature_dictionary, bytes=4, signed=False, index_input=False,
**kwargs):
super(IndexedTermFeatureListCorpusWriter, self).__init__(base_dir, **kwargs)
self.metadata["bytes"] = bytes
self.metadata["signed"] = signed
self.write_metadata()
self.feature_dictionary = feature_dictionary
self.term_dictionary = term_dictionary
# Write the dictionaries out to disk straight away
self.write_dictionaries()
self.int_packer = get_struct(self.metadata["bytes"], self.metadata["signed"])
self.length_packer = get_struct(2, False)
self.index_input = index_input
self._added_data_points = 0
[docs] def write_dictionaries(self):
with open(os.path.join(self.data_dir, "term_dictionary"), "w") as f:
pickle.dump(self.term_dictionary, f, -1)
with open(os.path.join(self.data_dir, "feature_dictionary"), "w") as f:
pickle.dump(self.feature_dictionary, f, -1)
def __enter__(self):
self.data_file = open(os.path.join(self.data_dir, "data"), "w")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.data_file.close()
# Set the length to be the number of data points written
self.metadata["length"] = self._added_data_points
super(IndexedTermFeatureListCorpusWriter, self).__exit__(exc_type, exc_val, exc_tb)
[docs] def add_data_points(self, iterable):
for term, feature_counts in iterable:
feature_counts = feature_counts.items()
if not self.index_input:
# Look up indices in the dictionaries
if term not in self.term_dictionary.token2id:
# Skip unknown terms
continue
term = self.term_dictionary.token2id[term]
# Look up all feature name indices, skipping unknown features
feature_counts = [(self.feature_dictionary.token2id[f], cnt) for (f, cnt) in feature_counts
if f in self.feature_dictionary.token2id]
# Filter out zero counts
feature_counts = [(f, c) for (f, c) in feature_counts if c > 0]
if len(feature_counts) == 0:
# If there aren't any features left after filtering on dictionary and counts, skip whole data point
continue
# Now we've converted to indices, write the data out
# First write the term index
_write_binary_int(self.data_file, self.int_packer, term)
# Next the number of feature counts so we know how much to read when reading in
_write_binary_int(self.data_file, self.length_packer, len(feature_counts))
# Now all the features and their counts
for f, c in feature_counts:
_write_binary_int(self.data_file, self.int_packer, f)
_write_binary_int(self.data_file, self.int_packer, c)
# Keep a record of how much data we wrote so we can make the corpus length available to future modules
self._added_data_points += 1
def _read_binary_int(f, unpacker):
# Read the right number of bytes from the file
string = f.read(unpacker.size)
if string == "":
raise StopIteration
try:
# Try decoding this as a single int
integer = unpacker.unpack(string)[0]
except struct.error, e:
raise DatatypeLoadError("could not unpack integer data in file: corrupt data? %s" % e)
return integer
def _write_binary_int(f, packer, x):
string = packer.pack(x)
f.write(string)
def escape_sep(sep, sep_type, text):
return text.replace(sep, "~~%s~~" % sep_type)
def unescape_sep(sep, sep_type, text):
return text.replace("~~%s~~" % sep_type, sep)