Source code for pimlico.datatypes.embeddings

# This file is part of Pimlico
# Copyright (C) 2020 Mark Granroth-Wilding
# Licensed under the GNU LGPL v3.0 - https://www.gnu.org/licenses/lgpl-3.0.en.html

"""
Datatypes to store embedding vectors, together with their words.

The main datatype here, :class:`Embeddings`, is the main datatype that should be used
for passing embeddings between modules.

We also provide a simple file collection datatype that stores the files used by
Tensorflow, for example, as input to the Tensorflow Projector.
Modules that need data in this format can use this datatype, which makes it
easy to convert from other formats.

"""
from __future__ import absolute_import

from builtins import str, input
from past.builtins import basestring
from builtins import object

import io
import os

from backports import csv

from pimlico.core.dependencies.python import numpy_dependency, PythonPackageOnPip, fasttext_dependency
from pimlico.datatypes import PimlicoDatatype
from pimlico.datatypes.files import NamedFileCollection
from pimlico.utils.core import cached_property

__all__ = ["Vocab", "Embeddings", "TSVVecFiles", "Word2VecFiles",
           "DocEmbeddingsMapper", "FixedEmbeddingsDocMapper", "FastTextDocMapper"]


[docs]class Vocab(object): """ A single vocabulary item, used internally for collecting per-word frequency info. A simplified version of Gensim's ``Vocab``. """ def __init__(self, word, index, count=0): self.word = word self.index = index self.count = count def __unicode__(self): return self.word def __str__(self): return self.word.encode("ascii", "replace")
[docs]class Embeddings(PimlicoDatatype): """ Datatype to store embedding vectors, together with their words. Based on Gensim's ``KeyedVectors`` object, but adapted for use in Pimlico and so as not to depend on Gensim. (This means that this can be used more generally for storing embeddings, even when we're not depending on Gensim.) Provides a method to map to Gensim's ``KeyedVectors`` type for compatibility. Doesn't provide all of the functionality of ``KeyedVectors``, since the main purpose of this is for storage of vectors and other functionality, like similarity computations, can be provided by utilities or by direct use of Gensim. Since we don't depend on Gensim, this datatype supports Python 2. However, if you try to use the mapping to Gensim's type, this will only work with Gensim installed and therefore also depends on Python 3. """ datatype_name = "embeddings" datatype_supports_python2 = True
[docs] def get_software_dependencies(self): return super(Embeddings, self).get_software_dependencies() + [numpy_dependency]
[docs] def get_writer_software_dependencies(self): return super(Embeddings, self).get_writer_software_dependencies() + [numpy_dependency]
[docs] class Reader(object):
[docs] class Setup(object):
[docs] def get_required_paths(self): return ["vectors.npy", "vocab.csv"]
@cached_property def vectors(self): import numpy with io.open(os.path.join(self.data_dir, "vectors.npy"), "rb") as f: return numpy.load(f, allow_pickle=False) @cached_property def normed_vectors(self): import numpy vectors = self.vectors vectors /= numpy.sqrt((vectors ** 2.).sum(axis=1))[:, numpy.newaxis] return vectors @cached_property def vector_size(self): return self.vectors.shape[1] @cached_property def word_counts(self): with io.open(os.path.join(self.data_dir, "vocab.csv"), "r", encoding="utf-8", newline="") as f: reader = csv.reader(f) return [(row[0], int(row[1])) for row in reader] @cached_property def index2vocab(self): return [Vocab(word, i, count=count) for i, (word, count) in enumerate(self.word_counts)] @cached_property def index2word(self): return [v.word for v in self.index2vocab] @cached_property def vocab(self): # Build the vocab by indexing the vocab items (in index2word) by word return dict((v.word, v) for v in self.index2vocab) def __len__(self): return len(self.index2vocab)
[docs] def word_vec(self, word, norm=False): """ Accept a single word as input. Returns the word's representation in vector space, as a 1D numpy array. """ return self.word_vecs([word], norm=norm)
[docs] def word_vecs(self, words, norm=False): """ Accept multiple words as input. Returns the words' representations in vector space, as a 1D numpy array. """ try: word_ids = [self.vocab[w].index for w in words] except KeyError as e: raise KeyError("word not in vocabulary: {}".format(e)) if norm: return self.normed_vectors[word_ids] else: return self.vectors[word_ids]
[docs] def to_keyed_vectors(self): from gensim.models.keyedvectors import KeyedVectors, Vocab as GensimVocab kvecs = KeyedVectors(self.vector_size) index2vocab = [GensimVocab(word=v.word, count=v.count, index=v.index) for v in self.index2vocab] kvecs.vocab = dict((v.word, v) for v in index2vocab) kvecs.index2word = self.index2word kvecs.syn0 = self.vectors kvecs.init_sims() return kvecs
def __getitem__(self, words): """ Accept a single word or a list of words as input. If a single word: returns the word's representations in vector space, as a 1D numpy array. Multiple words: return the words' representations in vector space, as a 2d numpy array: #words x #vector_size. Matrix rows are in the same order as in input. Example:: >>> trained_model['office'] array([ -1.40128313e-02, ...]) >>> trained_model[['office', 'products']] array([ -1.40128313e-02, ...] [ -1.70425311e-03, ...] ...) """ if isinstance(words, basestring): # allow calls like trained_model['office'], as a shorthand for trained_model[['office']] words = [words] return self.word_vecs(words) def __contains__(self, word): return word in self.vocab
[docs] class Writer(object): required_tasks = ["vocab", "vectors"]
[docs] def write_vectors(self, arr): """ Write out vectors from a Numpy array """ import numpy with open(os.path.join(self.data_dir, "vectors.npy"), "wb") as f: numpy.save(f, arr, allow_pickle=False) self.task_complete("vectors")
[docs] def write_word_counts(self, word_counts): """ Write out vocab from a list of words with counts. :param word_counts: list of (unicode, int) pairs giving each word and its count. Vocab indices are determined by the order of words """ with io.open(os.path.join(self.data_dir, "vocab.csv"), "w", encoding="utf-8", newline="") as f: writer = csv.writer(f) for word, count in word_counts: writer.writerow([str(word), str(count)]) self.task_complete("vocab")
[docs] def write_vocab_list(self, vocab_items): """ Write out vocab from a list of vocab items (see ``Vocab``). :param vocab_items: list of ``Vocab`` s """ self.write_word_counts([(v.word, v.count) for v in vocab_items])
[docs] def write_keyed_vectors(self, *kvecs): """ Write both vectors and vocabulary straight from Gensim's ``KeyedVectors`` data structure. Can accept multiple objects, which will then be concatenated in the output. """ import numpy if len(kvecs) > 1: vecs = numpy.vstack(tuple(kv.syn0 for kv in kvecs)) else: vecs = kvecs[0].syn0 self.write_vectors(vecs) self.write_word_counts([(w, kv.vocab[w].count) for kv in kvecs for w in kv.index2word])
[docs] def run_browser(self, reader, opts): """ Just output some info about the embeddings. We could also iterate through some of the words or provide other inspection tools, but for now we don't do that. """ import numpy as np from sklearn.metrics import euclidean_distances from sklearn.metrics.pairwise import cosine_distances print("Reading embeddings...") num_vectors = len(reader) vector_size = reader.vector_size print("Embeddings for {:,} words, of {:,} dimensions".format(num_vectors, vector_size)) # Select a few embeddings at random rand_ids = np.random.choice(num_vectors, 10) rand_vecs = reader.vectors[rand_ids] print("\nNearest neighbours for some random embeddings, using Euclidean distance") for rand_id, rand_vec in zip(rand_ids, rand_vecs): nns = np.argsort(euclidean_distances(rand_vec.reshape(1, -1), reader.vectors))[0] word, count = reader.word_counts[rand_id] print(" {} ({:,}): {}".format( word, count, ", ".join(reader.word_counts[i][0] for i in nns[:10]) )) print("\nNearest neighbours for some random embeddings, using cosine distance") for rand_id, rand_vec in zip(rand_ids, rand_vecs): nns = np.argsort(cosine_distances(rand_vec.reshape(1, -1), reader.vectors))[0] word, count = reader.word_counts[rand_id] print(" {} ({:,}): {}".format( word, count, ", ".join(reader.word_counts[i][0] for i in nns[:10]) ))
[docs]class TSVVecFiles(NamedFileCollection): """ Embeddings stored in TSV files. This format is used by Tensorflow and can be used, for example, as input to the Tensorflow Projector. It's just a TSV file with each vector on a row, and another metadata TSV file with the names associated with the points and the counts. The counts are not necessary, so the metadata can be written without them if necessary. """ datatype_name = "tsv_vec_files" datatype_supports_python2 = True def __init__(self, *args, **kwargs): super(TSVVecFiles, self).__init__(["embeddings.tsv", "metadata.tsv"], *args, **kwargs)
[docs] class Reader(object):
[docs] def get_embeddings_data(self): return self.read_file(self.filenames[0])
[docs] def get_embeddings_metadata(self): return self.read_file(self.filenames[1])
[docs] class Writer(object):
[docs] def write_vectors(self, array): with io.open(self.get_absolute_path(self.filenames[0]), "w", encoding="utf-8", newline="") as f: writer = csv.writer(f, dialect="excel-tab") # Write each row for vec in array: writer.writerow([str(val) for val in vec]) self.file_written(self.filenames[0])
[docs] def write_vocab_with_counts(self, word_counts): with io.open(self.get_absolute_path(self.filenames[1]), "w", encoding="utf-8", newline="") as f: writer = csv.writer(f, dialect="excel-tab") writer.writerow([u"Word", u"Count"]) for word, count in word_counts: writer.writerow([str(word), str(count)]) self.file_written(self.filenames[1])
[docs] def write_vocab_without_counts(self, words): with io.open(self.get_absolute_path(self.filenames[1]), "w", encoding="utf-8", newline="") as f: writer = csv.writer(f, dialect="excel-tab") writer.writerow([u"Word"]) for word in words: writer.writerow([str(word)]) self.file_written(self.filenames[1])
[docs]class Word2VecFiles(NamedFileCollection): datatype_name = "word2vec_files" datatype_supports_python2 = True def __init__(self, *args, **kwargs): super(Word2VecFiles, self).__init__(["embeddings.bin", "embeddings.vocab"], *args, **kwargs)
class FastTextEmbeddings(PimlicoDatatype): """ Simple wrapper for fastText embeddings, stored and written using the `fastText Python package <https://fasttext.cc/docs/en/python-module.html>`. Depends on the fastText package, installed via Pip. """ datatype_name = "fasttext_embeddings" datatype_supports_python2 = True def get_software_dependencies(self): return super(FastTextEmbeddings, self).get_software_dependencies() + [fasttext_dependency] class Reader: def load_model(self): import fasttext model_path = os.path.join(self.data_dir, "model.bin") return fasttext.load_model(model_path) class Writer: required_tasks = ["model"] def save_model(self, model): model_path = os.path.join(self.data_dir, "model.bin") model.save_model(model_path) self.task_complete("model")
[docs]class DocEmbeddingsMapper(PimlicoDatatype): """ Abstract datatype. An embedding loader provides a method to take a list of tokens (e.g. a tokenized document) and produce an embedding for each token. It will not necessarily be able to produce an embedding for *any* given term, so might return None for some tokens. This is more general than the :class:`~.Embeddings` datatype, as it allows this method to potentially produce embeddings for an infinite set of terms. Conversely, it is not able to say which set of terms it can produce embeddings for. It provides a unified interface to composed embeddings, like fastText, which can use sub-word information to produce embeddings of OOVs; context-sensitive embeddings, like BERT, which taken into account the context of a token; and fixed embeddings, which just return a fixed embedding for in-vocab terms. Some subtypes are just wrappers for fixed sets of embeddings. """ datatype_name = "doc_embeddings_mapper"
[docs] def get_software_dependencies(self): return super(DocEmbeddingsMapper, self).get_software_dependencies() + [numpy_dependency]
[docs] def run_browser(self, reader, opts): """ Simple tool to display embeddings for the words of user-entered sentences. """ print("Enter a sentence to see its word vectors. Ctrl+D to exit") try: while True: input_text = input("> ") sentence = input_text.split() embeddings = reader.get_embeddings(sentence) for w, (word, embedding) in enumerate(zip(sentence, embeddings)): print("{} {}: {}".format(w, word, embedding)) except EOFError: print("Exiting")
[docs] class Reader:
[docs] def get_embeddings(self, tokens): """ Subclasses should produce a list, with an item for each token. The item may be None, or a numpy array containing a vector for the token. :param tokens: list of strings :return: list of embeddings """ raise NotImplementedError("abstract datatype does not implement get_embeddings")
[docs]class FastTextDocMapper(DocEmbeddingsMapper): datatype_name = "fasttext_doc_embeddings_mapper"
[docs] def get_software_dependencies(self): return super(FastTextDocMapper, self).get_software_dependencies() + [fasttext_dependency]
[docs] class Reader: @cached_property def model(self): import fasttext model_path = os.path.join(self.data_dir, "model.bin") return fasttext.load_model(model_path)
[docs] def get_embeddings(self, tokens): return [self.model.get_word_vector(token) for token in tokens]
[docs] class Writer: required_tasks = ["model"]
[docs] def save_model(self, model): model_path = os.path.join(self.data_dir, "model.bin") model.save_model(model_path) self.task_complete("model")
[docs]class FixedEmbeddingsDocMapper(DocEmbeddingsMapper): datatype_name = "fixed_embeddings_doc_embeddings_mapper"
[docs] class Reader(object):
[docs] class Setup(object):
[docs] def get_required_paths(self): return ["vectors.npy", "vocab.csv"]
@cached_property def vectors(self): import numpy with io.open(os.path.join(self.data_dir, "vectors.npy"), "rb") as f: return numpy.load(f, allow_pickle=False) @cached_property def vector_size(self): return self.vectors.shape[1] @cached_property def word_counts(self): with io.open(os.path.join(self.data_dir, "vocab.csv"), "r", encoding="utf-8", newline="") as f: reader = csv.reader(f) return [(row[0], int(row[1])) for row in reader] @cached_property def index2vocab(self): return [Vocab(word, i, count=count) for i, (word, count) in enumerate(self.word_counts)] @cached_property def index2word(self): return [v.word for v in self.index2vocab] @cached_property def vocab(self): # Build the vocab by indexing the vocab items (in index2word) by word return dict((v.word, v) for v in self.index2vocab) def __len__(self): return len(self.index2vocab)
[docs] def word_vec(self, word): """ Accept a single word as input. Returns the word's representation in vector space, as a 1D numpy array. """ try: word_id = self.vocab[word].index except KeyError as e: raise KeyError("word not in vocabulary: {}".format(e)) return self.vectors[word_id]
def __contains__(self, word): return word in self.vocab
[docs] def get_embeddings(self, tokens): return [self.word_vec(token) if token in self else None for token in tokens]
[docs] class Writer(object): required_tasks = ["vocab", "vectors"]
[docs] def write_vectors(self, arr): """Write out vectors from a Numpy array """ import numpy with open(os.path.join(self.data_dir, "vectors.npy"), "wb") as f: numpy.save(f, arr, allow_pickle=False) self.task_complete("vectors")
[docs] def write_word_counts(self, word_counts): """ Write out vocab from a list of words with counts. :param word_counts: list of (unicode, int) pairs giving each word and its count. Vocab indices are determined by the order of words """ with io.open(os.path.join(self.data_dir, "vocab.csv"), "w", encoding="utf-8", newline="") as f: writer = csv.writer(f) for word, count in word_counts: writer.writerow([str(word), str(count)]) self.task_complete("vocab")
[docs] def write_vocab_list(self, vocab_items): """ Write out vocab from a list of vocab items (see ``Vocab``). :param vocab_items: list of ``Vocab`` s """ self.write_word_counts([(v.word, v.count) for v in vocab_items])
[docs] def write_keyed_vectors(self, *kvecs): """ Write both vectors and vocabulary straight from Gensim's ``KeyedVectors`` data structure. Can accept multiple objects, which will then be concatenated in the output. """ import numpy if len(kvecs) > 1: vecs = numpy.vstack(tuple(kv.syn0 for kv in kvecs)) else: vecs = kvecs[0].syn0 self.write_vectors(vecs) self.write_word_counts([(w, kv.vocab[w].count) for kv in kvecs for w in kv.index2word])