Source code for ffp.embeddings

"""
Finalfusion Embeddings
"""
from os import PathLike
from typing import Optional, Union, Tuple, List, BinaryIO

import numpy as np

from ffp.io import Chunk, ChunkIdentifier, Header, _read_binary, _read_chunk_header
from ffp.metadata import Metadata
from ffp.norms import Norms
from ffp.storage import Storage, NdArray, QuantizedArray
from ffp.subwords import FastTextIndexer
from ffp.vocab import Vocab, FastTextVocab, FinalfusionBucketVocab, SimpleVocab, ExplicitVocab


[docs]class Embeddings: # pylint: disable=too-many-instance-attributes """ Embeddings class. Typically consists of a :class:`~ffp.storage.storage.Storage` and :class:`~ffp.vocab.vocab.Vocab`. Other possible chunks are :class:`ffp.norms.Norms` corresponding to the embeddings of the in-vocab tokens and :class:`~ffp.metadata.Metadata`. If a vocabulary, storage are provided, embeddings can be retrieved through three methods: 1. :meth:`Embeddings.embedding` allows to provide a default value and returns this value if no embedding could be found. 2. :meth:`Embeddings.__getitem__` retrieves an embedding for the query but raises an exception if it cannot retrieve an embedding. 3. :meth:`Embeddings.embedding_with_norm` requires a :class:`~ffp.norms.Norms` chunk and returns an embedding together with the corresponding L2 norm. Embeddings wrap any combination of the 4 chunk types: 1. :class:`~ffp.storage.Storage`, either :class:`~ffp.storage.ndarray.NdArray` or :class:`~ffp.storage.quantized.QuantizedArray` 2. :class:`~ffp.storage.Vocab`, one of :class:`~ffp.vocab.simple_vocab.SimpleVocab`, :class:`~ffp.vocab.subword.FinalfusionBucketVocab`, :class:`~ffp.vocab.subword.FastTextVocab` and :class:`~ffp.vocab.subword.ExplicitVocab` Examples -------- >>> storage = NdArray(np.float32(np.random.rand(2, 10))) >>> vocab = SimpleVocab(["Some", "words"]) >>> metadata = Metadata({"Some": "value", "numerical": 0}) >>> norms = Norms(np.float32(np.random.rand(2))) >>> embeddings = Embeddings(storage=storage, vocab=vocab, metadata=metadata, norms=norms) >>> embeddings.vocab.words ['Some', 'words'] >>> np.allclose(embeddings["Some"], storage[0]) True >>> try: ... embeddings["oov"] ... except KeyError: ... True True >>> _, n = embeddings.embedding_with_norm("Some") >>> np.isclose(n, norms[0]) True >>> embeddings.metadata {'Some': 'value', 'numerical': 0} """
[docs] def __init__(self, storage: Optional[Storage] = None, vocab: Optional[Vocab] = None, norms: Optional[Norms] = None, metadata: Optional[Metadata] = None): """ Initialize Embeddings. Initializes Embeddings with the given chunks. :Conditions: The following conditions need to hold if the respective chunks are passed. * Chunks need to have the expected type. * ``vocab.idx_bound == storage.shape[0]`` * ``len(vocab) == len(norms)`` * ``len(norms) == len(vocab) and len(norms) >= storage.shape[0]`` Parameters ---------- storage : Storage, optional Embeddings Storage. vocab : Vocab, optional Embeddings Vocabulary. norms : Norms, optional Embeddings Norms. metadata : Metadata, optional Embeddings Metadata. Raises ------ AssertionError If any of the conditions don't hold. """ assert storage is None or isinstance( storage, Storage), "storage is required to be Storage" assert vocab is None or isinstance( vocab, Vocab), "vocab is required to be Vocab" assert norms is None or isinstance( norms, Norms), "norms is required to be Norms" assert metadata is None or isinstance( metadata, Metadata), "metadata is required to be Metadata" if vocab is not None and storage is not None: assert storage.shape[ 0] == vocab.idx_bound, "Number of embeddings needs to be equal to vocab's idx_bound" if vocab is not None and norms is not None: assert len(vocab) == len( norms), "Vocab length needs to be equal to number of norms." if storage is not None and norms is not None: assert storage.shape[0] >= len( norms ), "Number of embeddings needs to be greater than or equal to number of norms." self._storage = storage self._vocab = vocab self._norms = norms self._metadata = metadata
[docs] def __getitem__(self, item: str) -> np.ndarray: """ Returns an embeddings. Parameters ---------- item : str The query item. Returns ------- embedding : numpy.ndarray The embedding. Raises ------ KeyError If no embedding could be retrieved. See Also -------- :func:`~Embeddings.embedding` :func:`~Embeddings.embedding_with_norm` """ # no need to check for none since Vocab raises KeyError if it can't produce indices idx = self._vocab[item] res = self._storage[idx] if res.ndim == 1: return res embed_sum = res.sum(axis=0) return embed_sum / np.linalg.norm(embed_sum)
[docs] def embedding(self, word: str, out: Optional[np.ndarray] = None, default: Optional[np.ndarray] = None ) -> Optional[np.ndarray]: """ Embedding lookup. Looks up the embedding for the input word. If an `out` array is specified, the embedding is written into the array. If it is not possible to retrieve an embedding for the input word, the `default` value is returned. This defaults to `None`. An embedding can not be retrieved if the vocabulary cannot provide an index for `word`. This method fails if either the storage or vocab are not set. Parameters ---------- word : str The query word. out : numpy.ndarray, any, optional Optional output array to write the embedding into. default: numpy.ndarray, any, optional Optional default value to return if no embedding can be retrieved. Defaults to None. Returns ------- embedding : numpy.ndarray, optional The retrieved embedding or the default value. Examples -------- >>> matrix = np.float32(np.random.rand(2, 10)) >>> storage = NdArray(matrix) >>> vocab = SimpleVocab(["Some", "words"]) >>> embeddings = Embeddings(storage=storage, vocab=vocab) >>> np.allclose(embeddings.embedding("Some"), matrix[0]) True >>> # default value is None >>> embeddings.embedding("oov") is None True >>> # It's possible to specify a default value >>> default = embeddings.embedding("oov", default=storage[0]) >>> np.allclose(default, storage[0]) True >>> # Embeddings can be written to an output buffer. >>> out = np.zeros(10, dtype=np.float32) >>> out2 = embeddings.embedding("Some", out=out) >>> out is out2 True >>> np.allclose(out, matrix[0]) True See Also -------- :func:`~Embeddings.embedding_with_norm` :func:`~Embeddings.__getitem__` """ idx = self._vocab.idx(word) if idx is None: if out is not None and default is not None: out[:] = default return out return default res = self._storage[idx] if res.ndim == 1: if out is not None: out[:] = res else: out = res else: out = np.add.reduce(res, 0, out=out, keepdims=False) out /= np.linalg.norm(out) return out
[docs] def embedding_with_norm(self, word: str, out: Optional[np.ndarray] = None, default: Optional[Tuple[np.ndarray, float]] = None ) -> Optional[Tuple[np.ndarray, float]]: """ Embedding lookup. Looks up the embedding for the input word together with its norm. If an `out` array is specified, the embedding is written into the array. If it is not possible to retrieve an embedding for the input word, the `default` value is returned. This defaults to `None`. An embedding can not be retrieved if the vocabulary cannot provide an index for `word`. This method fails if either storage, vocab or norms are not set. Parameters ---------- word : str The query word. out : Optional[numpy.ndarray] Optional output array to write the embedding into. default: Optional[numpy.ndarray] Optional default value to return if no embedding can be retrieved. Defaults to None. Returns ------- (embedding, norm) : tuple, optional Tuple with the retrieved embedding or the default value at the first index and the norm at the second index. See Also -------- :func:`~Embeddings.embedding` :func:`~Embeddings.__getitem__` """ if self._norms is None: raise TypeError("embeddings don't contain norms chunk") idx = self._vocab.idx(word) if idx is None: if out is not None and default is not None: out[:] = default[0] return out, default[1] return default res = self._storage[idx] if res.ndim == 1: if out is not None: out[:] = res else: out = res return out, self._norms[idx] out = np.add.reduce(res, 0, out=out, keepdims=False) norm = np.linalg.norm(out) out /= norm return out, norm
@property def storage(self) -> Optional[Storage]: """ Get the :class:`Embeddings` :class:`ffp.storage.storage.Storage`. Returns None if no storage is set. :Setter: Sets a new storage. :Getter: Get the storage. Returns ------- storage : Storage, optional The embeddings storage. Raises ------ AssertionError if ``embeddings.storage.shape[0] != embeddings.vocab.idx_bound`` or ``len(embeddings.norms) > embeddings.storage.shape[0]`` TypeError If storage is neither a Storage nor None. """ return self._storage @storage.setter def storage(self, storage: Optional[Storage]): if storage is None: self._storage = None elif isinstance(storage, Storage): if self._norms is not None: assert storage.shape[0] >= len( self._norms ), "Number of embeddings needs to be greater than or equal to number of norms." if self._vocab is not None: assert storage.shape[ 0] == self._vocab.idx_bound,\ "Number of embeddings needs to be equal to vocab's idx_bound" self._storage = storage else: raise TypeError("Expected 'None' or 'Vocab'.") @property def vocab(self) -> Optional[Vocab]: """ The :class:`~ffp.vocab.vocab.Vocab`. :Getter: Returns None or the Vocabulary. :Setter: Set the vocabulary. Returns ------- vocab : Vocab, optional The vocabulary or `None`. Raises ------ AssertionError if ``embeddings.storage.shape[0] != embeddings.vocab.idx_bound`` or ``len(embeddings.norms) != len(embeddings.vocab)`` TypeError If vocab is neither a Vocab nor None. Examples -------- >>> words = ['Some', 'words'] >>> vocab = SimpleVocab(words) >>> embeddings = Embeddings(vocab=vocab) >>> embeddings.vocab.words ['Some', 'words'] >>> embeddings.vocab['Some'] 0 """ return self._vocab @vocab.setter def vocab(self, vocab: Optional[Vocab]): if vocab is None: self._vocab = None elif isinstance(vocab, Vocab): if self._norms is not None: assert len(vocab) == len( self._norms ), "Vocab length needs to be equal to number of norms." if self._storage is not None: # pylint: disable=unsubscriptable-object assert self._storage.shape[ 0] == vocab.idx_bound, \ "Vocab's idx_bound needs to be equal to number of embeddings." self._vocab = vocab else: raise TypeError("Expected 'None' or 'Vocab'.") @property def norms(self) -> Optional[Norms]: """ The :class:`~ffp.vocab.vocab.Norms`. :Getter: Returns None or the Norms. :Setter: Set the Norms. Returns ------- norms : Norms, optional The Norms or None. Raises ------ AssertionError if ``embeddings.storage.shape[0] < len(embeddings.norms)`` or ``len(embeddings.norms) != len(embeddings.vocab)`` TypeError If ``norms`` is neither Norms nor None. Examples -------- >>> norms = Norms(np.float32(np.abs(np.random.rand(5)))) >>> embeddings = Embeddings() >>> embeddings.norms = norms >>> np.isclose(embeddings.norms[0], norms[0]) True """ return self._norms @norms.setter def norms(self, norms: Optional[Norms]): if norms is None: self._norms = None elif isinstance(norms, Norms): if self._vocab is not None: assert len(self._vocab) == len( norms), "Vocab and norms need to have same length" if self._storage is not None: # pylint: disable=unsubscriptable-object assert self._storage.shape[0] >= len( norms ), "Number of norms needs to be equal to or less than number of embeddings" self._norms = norms else: raise TypeError("Expected 'None' or 'Norms'.") @property def metadata(self) -> Optional[Metadata]: """ The :class:`~ffp.vocab.vocab.Metadata`. :Getter: Returns None or the Metadata. :Setter: Set the Metadata. Returns ------- metadata : Metadata, optional The Metadata or None. Raises ------ TypeError If ``metadata`` is neither Metadata nor None. Examples -------- >>> metadata = Metadata({"test": "value", "num": -1}) >>> embeddings = Embeddings() >>> embeddings.metadata = metadata >>> embeddings.metadata {'test': 'value', 'num': -1} """ return self._metadata @metadata.setter def metadata(self, metadata: Optional[Metadata]): if metadata is None: self._metadata = None elif isinstance(metadata, Metadata): self._metadata = metadata else: raise TypeError("Expected 'None' or 'Metadata'.")
[docs] def bucket_to_explicit(self) -> 'Embeddings': """ Convert bucket embeddings to embeddings with explicit lookup. Multiple embeddings can still map to the same bucket, but all buckets that are not indexed by in-vocabulary n-grams are eliminated. This can have a big impact on the size of the embedding matrix. A side effect of this method is the conversion from a quantized storage to an array storage. Returns ------- embeddings : Embeddings Embeddings with an ExplicitVocab instead of a hash-based vocabulary. Raises ------ TypeError If the current vocabulary is not a hash-based vocabulary (FinalfusionBucketVocab or FastTextVocab) """ bucket_vocabs = (FastTextVocab, FinalfusionBucketVocab) if not isinstance(self._vocab, bucket_vocabs): raise TypeError( "Only bucketed embeddings can be converted to explicit.") vocab = self._vocab.to_explicit() if self._storage is None: return Embeddings(vocab=vocab) storage = np.zeros((vocab.idx_bound, self._storage.shape[1]), dtype=np.float32) storage[:len(vocab)] = self._storage[:len(vocab)] for ngram in vocab.subword_indexer: storage[len(vocab) + vocab.subword_indexer(ngram)] = self._storage[ len(vocab) + self._vocab.subword_indexer(ngram)] return Embeddings(vocab=vocab, storage=NdArray(storage))
[docs] def chunks(self) -> List[Chunk]: """ Get the Embeddings Chunks as a list. The Chunks are ordered in the expected serialization order: 1. Metadata 2. Vocabulary 3. Storage 4. Norms Returns ------- chunks : List[Chunk] List of embeddings chunks. """ chunks = [] if self._vocab is not None: chunks.append(self.vocab) if self._storage is not None: chunks.append(self.storage) if self._metadata is not None: chunks.append(self.metadata) if self._norms is not None: chunks.append(self.norms) return chunks
[docs] def write(self, file: str): """ Write the Embeddings to the given file. Writes the Embeddings to a finalfusion file at the given file. Parameters ---------- file : str Path of the output file. """ with open(file, 'wb') as outf: chunks = self.chunks() header = Header([chunk.chunk_identifier() for chunk in chunks]) header.write_chunk(outf) for chunk in chunks: chunk.write_chunk(outf)
def __contains__(self, item): if self._vocab is None: raise TypeError("These embeddings don't contain a vocabulary") return item in self._vocab def __repr__(self): return "Embeddings { \n" + "\n".join( [repr(chunk) for chunk in self.chunks()]) + "\n}" def __iter__(self): if self._norms is not None: return zip(self._vocab.words, self._storage, self._norms) return zip(self._vocab.words, self._storage)
[docs]def load_finalfusion(file: Union[str, bytes, int, PathLike], mmap: bool = False) -> Embeddings: """ Read embeddings from a file in finalfusion format. Parameters ---------- file : str, bytes, int, PathLike Path to a file with embeddings in finalfusoin format. mmap : bool Toggles memory mapping the storage buffer. Returns ------- embeddings : Embeddings The embeddings from the input file. """ with open(file, 'rb') as inf: _ = Header.read_chunk(inf) chunk_id, _ = _read_chunk_header(inf) embeddings = Embeddings() while True: if chunk_id.is_storage(): embeddings.storage = _STORAGE_READERS[chunk_id](inf, mmap) elif chunk_id.is_vocab(): embeddings.vocab = _VOCAB_READERS[chunk_id](inf) elif chunk_id == ChunkIdentifier.NdNorms: embeddings.norms = Norms.read_chunk(inf) elif chunk_id == ChunkIdentifier.Metadata: embeddings.metadata = Metadata.read_chunk(inf) else: chunk_id, _ = _read_chunk_header(inf) raise TypeError("Unknown chunk type: " + str(chunk_id)) chunk_header = _read_chunk_header(inf) if chunk_header is None: break chunk_id, _ = chunk_header return embeddings
[docs]def load_word2vec(file: Union[str, bytes, int, PathLike]) -> Embeddings: """ Read embeddings in word2vec binary format. Files are expected to start with a line containing rows and cols in utf-8. Words are encoded in utf-8 followed by a single whitespace. After the whitespace the embedding components are expected as little-endian float32. Parameters ---------- file : str, bytes, int, PathLike Path to a file with embeddings in word2vec binary format. Returns ------- embeddings : Embeddings The embeddings from the input file. """ words = [] with open(file, 'rb') as inf: rows, cols = map(int, inf.readline().decode("utf-8").split()) matrix = np.zeros((rows, cols), dtype=np.float32) for row in range(rows): word = [] while True: byte = inf.read(1) if byte == b' ': break if byte == b'': raise EOFError if byte != b'\n': word.append(byte) word = b''.join(word).decode('utf-8') words.append(word) vec = inf.read(cols * matrix.itemsize) matrix[row] = np.frombuffer(vec, dtype=np.float32) norms = np.linalg.norm(matrix, axis=1) matrix /= np.expand_dims(norms, axis=1) return Embeddings(storage=NdArray(matrix), norms=Norms(norms), vocab=SimpleVocab(words))
[docs]def load_textdims(file: Union[str, bytes, int, PathLike]) -> Embeddings: """ Read emebddings in textdims format. The first line contains whitespace separated rows and cols, the rest of the file contains whitespace separated word and vector components. Parameters ---------- file : str, bytes, int, PathLike Path to a file with embeddings in word2vec binary format. Returns ------- embeddings : Embeddings The embeddings from the input file. """ words = [] with open(file) as inf: rows, cols = next(inf).split() matrix = np.zeros((int(rows), int(cols)), dtype=np.float32) for i, line in enumerate(inf): line = line.strip().split() words.append(line[0]) matrix[i] = line[1:] norms = np.linalg.norm(matrix, axis=1) matrix /= np.expand_dims(norms, axis=1) return Embeddings(storage=NdArray(matrix), norms=Norms(norms), vocab=SimpleVocab(words))
[docs]def load_text(file: Union[str, bytes, int, PathLike]) -> Embeddings: """ Read embeddings in text format. Parameters ---------- file : str, bytes, int, PathLike Path to a file with embeddings in word2vec binary format. Returns ------- embeddings : Embeddings Embeddings from the input file. The resulting Embeddings will have a SimpleVocab, NdArray and Norms. """ words = [] vecs = [] with open(file) as inf: for line in inf: line = line.strip().split() words.append(line[0]) vecs.append(line[1:]) matrix = np.array(vecs, dtype=np.float32) norms = np.linalg.norm(matrix, axis=1) matrix /= np.expand_dims(norms, axis=1) return Embeddings(storage=NdArray(matrix), norms=Norms(norms), vocab=SimpleVocab(words))
[docs]def load_fastText(file: Union[str, bytes, int, PathLike]) -> Embeddings: # pylint: disable=invalid-name """ Read embeddings from a file in fastText format. Parameters ---------- file : str, bytes, int, PathLike Path to a file with embeddings in word2vec binary format. Returns ------- embeddings : Embeddings The embeddings from the input file. """ with open(file, 'rb') as inf: _read_ft_header(inf) metadata = _read_ft_cfg(inf) vocab = _read_ft_vocab(inf, metadata['buckets'], metadata['min_n'], metadata['max_n']) quantized = _read_binary(inf, "<B")[0] if quantized: raise NotImplementedError( "Quantized storage is not supported for fastText models") rows, cols = _read_binary(inf, "<QQ") matrix = np.fromfile(file=inf, count=rows * cols, dtype=np.float32) matrix = np.reshape(matrix, (rows, cols)) for i, word in enumerate(vocab): indices = [i] + vocab.subword_indices(word) matrix[i] = matrix[indices].mean(0, keepdims=False) norms = np.linalg.norm(matrix[:len(vocab)], axis=1) matrix[:len(vocab)] /= np.expand_dims(norms, axis=1) storage = NdArray(matrix) norms = Norms(norms) return Embeddings(storage, vocab, norms, metadata)
def _read_ft_header(file: BinaryIO): magic, version = _read_binary(file, "<II") if magic != 793_712_314: raise ValueError(f"Magic should be 793_712_314, not: {magic}") if version > 12: raise ValueError(f"Expected version 12, not: {version}") def _read_ft_cfg(file: BinaryIO) -> Metadata: cfg = _read_binary(file, "<12Id") loss, model = cfg[6:8] # map to string if loss == 1: loss = 'HierarchicalSoftmax' elif loss == 2: loss = 'NegativeSampling' elif loss == 3: loss = 'Softmax' if model == 1: model = 'CBOW' elif model == 2: model = 'SkipGram' elif model == 3: model = 'Supervised' metadata = Metadata({ 'dims': cfg[0], 'window_size': cfg[1], 'epoch': cfg[2], 'min_count': cfg[3], 'ns': cfg[4], 'word_ngrams': cfg[5], 'loss': loss, 'model': model, 'buckets': cfg[8], 'min_n': cfg[9], 'max_n': cfg[10], 'lr_update_rate': cfg[11], 'sampling_threshold': cfg[12], }) return metadata def _read_ft_vocab(file: BinaryIO, buckets: int, min_n: int, max_n: int) -> FastTextVocab: vocab_size, _, n_labels = _read_binary(file, "<III") # discard n_words if n_labels: raise NotImplementedError( "fastText prediction models are not supported") _, prune_idx_size = _read_binary(file, "<Qq") # discard n_tokens if prune_idx_size > 0: raise NotImplementedError("Pruned vocabs are not supported") words = [] for _ in range(vocab_size): word = bytearray() while True: byte = file.read(1) if byte == b'\x00': words.append(word.decode("utf8")) break if byte == b'': raise EOFError word.extend(byte) _ = _read_binary(file, "<Q") # discard frequency entry_type = _read_binary(file, "<B")[0] if entry_type != 0: raise ValueError("Non word entry", word) indexer = FastTextIndexer(buckets, min_n, max_n) return FastTextVocab(words, indexer) _VOCAB_READERS = { ChunkIdentifier.SimpleVocab: SimpleVocab.read_chunk, ChunkIdentifier.BucketSubwordVocab: FinalfusionBucketVocab.read_chunk, ChunkIdentifier.FastTextSubwordVocab: FastTextVocab.read_chunk, ChunkIdentifier.ExplicitSubwordVocab: ExplicitVocab.read_chunk, } _STORAGE_READERS = { ChunkIdentifier.NdArray: NdArray.load, ChunkIdentifier.QuantizedArray: QuantizedArray.load, } __all__ = [ 'Embeddings', 'load_finalfusion', 'load_fastText', 'load_word2vec', 'load_textdims', 'load_text' ]