"""
Finalfusion Embeddings
"""
from os import PathLike
from typing import Optional, Union, Tuple, List, BinaryIO
import numpy as np
from ffp.io import Chunk, ChunkIdentifier, Header, _read_binary, _read_chunk_header,\
FinalfusionFormatError
from ffp.metadata import Metadata
from ffp.norms import Norms
from ffp.storage import Storage, NdArray, QuantizedArray
from ffp.subwords import FastTextIndexer
from ffp.vocab import Vocab, FastTextVocab, FinalfusionBucketVocab, SimpleVocab, ExplicitVocab
[docs]class Embeddings: # pylint: disable=too-many-instance-attributes
"""
Embeddings class.
Embeddings always contain a :class:`~finalfusion.storage.storage.Storage` and
:class:`~finalfusion.vocab.vocab.Vocab`. Optional chunks are
:class:`~finalfusion.norms.Norms` corresponding to the embeddings of the in-vocab tokens and
:class:`~finalfusion.metadata.Metadata`.
Embeddings can be retrieved through three methods:
1. :meth:`Embeddings.embedding` allows to provide a default value and returns
this value if no embedding could be found.
2. :meth:`Embeddings.__getitem__` retrieves an embedding for the query but
raises an exception if it cannot retrieve an embedding.
3. :meth:`Embeddings.embedding_with_norm` requires a :class:`~finalfusion.norms.Norms`
chunk and returns an embedding together with the corresponding L2 norm.
Embeddings are composed of the 4 chunk types:
1. :class:`~ffp.storage.Storage`: either :class:`~ffp.storage.ndarray.NdArray` or
:class:`~ffp.storage.quantized.QuantizedArray` *(required)*
2. :class:`~ffp.vocab.Vocab`, one of :class:`~ffp.vocab.simple_vocab.SimpleVocab`,
:class:`~ffp.vocab.subword.FinalfusionBucketVocab`,
:class:`~ffp.vocab.subword.FastTextVocab` and :class:`~ffp.vocab.subword.ExplicitVocab`
*(required)*
3. :class:`~ffp.norms.Norms`
4. :class:`~ffp.metadata.Metadata`
Examples
--------
>>> storage = NdArray(np.float32(np.random.rand(2, 10)))
>>> vocab = SimpleVocab(["Some", "words"])
>>> metadata = Metadata({"Some": "value", "numerical": 0})
>>> norms = Norms(np.float32(np.random.rand(2)))
>>> embeddings = Embeddings(storage=storage, vocab=vocab, metadata=metadata, norms=norms)
>>> embeddings.vocab.words
['Some', 'words']
>>> np.allclose(embeddings["Some"], storage[0])
True
>>> try:
... embeddings["oov"]
... except KeyError:
... True
True
>>> _, n = embeddings.embedding_with_norm("Some")
>>> np.isclose(n, norms[0])
True
>>> embeddings.metadata
{'Some': 'value', 'numerical': 0}
"""
[docs] def __init__(self,
storage: Storage,
vocab: Vocab,
norms: Optional[Norms] = None,
metadata: Optional[Metadata] = None):
"""
Initialize Embeddings.
Initializes Embeddings with the given chunks.
:Conditions:
The following conditions need to hold if the respective chunks are passed.
* Chunks need to have the expected type.
* ``vocab.idx_bound == storage.shape[0]``
* ``len(vocab) == len(norms)``
* ``len(norms) == len(vocab) and len(norms) >= storage.shape[0]``
Parameters
----------
storage : Storage
Embeddings Storage.
vocab : Vocab
Embeddings Vocabulary.
norms : Norms, optional
Embeddings Norms.
metadata : Metadata, optional
Embeddings Metadata.
Raises
------
AssertionError
If any of the conditions don't hold.
"""
Embeddings._check_requirements(storage, vocab, norms, metadata)
self._storage = storage
self._vocab = vocab
self._norms = norms
self._metadata = metadata
[docs] def __getitem__(self, item: str) -> np.ndarray:
"""
Returns an embeddings.
Parameters
----------
item : str
The query item.
Returns
-------
embedding : numpy.ndarray
The embedding.
Raises
------
KeyError
If no embedding could be retrieved.
See Also
--------
:func:`~Embeddings.embedding`
:func:`~Embeddings.embedding_with_norm`
"""
# no need to check for none since Vocab raises KeyError if it can't produce indices
idx = self._vocab[item]
return self._embedding(idx)[0]
[docs] def embedding(self,
word: str,
out: Optional[np.ndarray] = None,
default: Optional[np.ndarray] = None
) -> Optional[np.ndarray]:
"""
Embedding lookup.
Looks up the embedding for the input word.
If an `out` array is specified, the embedding is written into the array.
If it is not possible to retrieve an embedding for the input word, the `default`
value is returned. This defaults to `None`. An embedding can not be retrieved if
the vocabulary cannot provide an index for `word`.
This method never fails. If you do not provide a default value, check the return value
for None. ``out`` is left untouched if no embedding can be found and ``default`` is None.
Parameters
----------
word : str
The query word.
out : numpy.ndarray, optional
Optional output array to write the embedding into.
default: numpy.ndarray, optional
Optional default value to return if no embedding can be retrieved. Defaults to None.
Returns
-------
embedding : numpy.ndarray, optional
The retrieved embedding or the default value.
Examples
--------
>>> matrix = np.float32(np.random.rand(2, 10))
>>> storage = NdArray(matrix)
>>> vocab = SimpleVocab(["Some", "words"])
>>> embeddings = Embeddings(storage=storage, vocab=vocab)
>>> np.allclose(embeddings.embedding("Some"), matrix[0])
True
>>> # default value is None
>>> embeddings.embedding("oov") is None
True
>>> # It's possible to specify a default value
>>> default = embeddings.embedding("oov", default=storage[0])
>>> np.allclose(default, storage[0])
True
>>> # Embeddings can be written to an output buffer.
>>> out = np.zeros(10, dtype=np.float32)
>>> out2 = embeddings.embedding("Some", out=out)
>>> out is out2
True
>>> np.allclose(out, matrix[0])
True
See Also
--------
:func:`~Embeddings.embedding_with_norm`
:func:`~Embeddings.__getitem__`
"""
idx = self._vocab.idx(word)
if idx is None:
if out is not None and default is not None:
out[:] = default
return out
return default
return self._embedding(idx, out)[0]
[docs] def embedding_with_norm(self,
word: str,
out: Optional[np.ndarray] = None,
default: Optional[Tuple[np.ndarray, float]] = None
) -> Optional[Tuple[np.ndarray, float]]:
"""
Embedding lookup with norm.
Looks up the embedding for the input word together with its norm.
If an `out` array is specified, the embedding is written into the array.
If it is not possible to retrieve an embedding for the input word, the `default`
value is returned. This defaults to `None`. An embedding can not be retrieved if
the vocabulary cannot provide an index for `word`.
This method raises a TypeError if norms are not set.
Parameters
----------
word : str
The query word.
out : numpy.ndarray, optional
Optional output array to write the embedding into.
default: Tuple[numpy.ndarray, float], optional
Optional default value to return if no embedding can be retrieved. Defaults to None.
Returns
-------
(embedding, norm) : EmbeddingWithNorm, optional
Tuple with the retrieved embedding or the default value at the first index and the
norm at the second index.
See Also
--------
:func:`~Embeddings.embedding`
:func:`~Embeddings.__getitem__`
"""
if self._norms is None:
raise TypeError("embeddings don't contain norms chunk")
idx = self._vocab.idx(word)
if idx is None:
if out is not None and default is not None:
out[:] = default[0]
return out, default[1]
return default
return self._embedding(idx, out)
@property
def storage(self) -> Optional[Storage]:
"""
Get the :class:`~finalfusion.storage.storage.Storage`.
Returns
-------
storage : Storage
The embeddings storage.
"""
return self._storage
@property
def vocab(self) -> Optional[Vocab]:
"""
The :class:`~finalfusion.vocab.vocab.Vocab`.
Returns
-------
vocab : Vocab
The vocabulary
"""
return self._vocab
@property
def norms(self) -> Optional[Norms]:
"""
The :class:`~finalfusion.vocab.vocab.Norms`.
:Getter: Returns None or the Norms.
:Setter: Set the Norms.
Returns
-------
norms : Norms, optional
The Norms or None.
Raises
------
AssertionError
if ``embeddings.storage.shape[0] < len(embeddings.norms)`` or
``len(embeddings.norms) != len(embeddings.vocab)``
TypeError
If ``norms`` is neither Norms nor None.
"""
return self._norms
@norms.setter
def norms(self, norms: Optional[Norms]):
if norms is None:
self._norms = None
else:
Embeddings._norms_compat(self.storage, self.vocab, self.norms)
self._norms = norms
@property
def metadata(self) -> Optional[Metadata]:
"""
The :class:`~finalfusion.vocab.vocab.Metadata`.
:Getter: Returns None or the Metadata.
:Setter: Set the Metadata.
Returns
-------
metadata : Metadata, optional
The Metadata or None.
Raises
------
TypeError
If ``metadata`` is neither Metadata nor None.
"""
return self._metadata
@metadata.setter
def metadata(self, metadata: Optional[Metadata]):
if metadata is None:
self._metadata = None
elif isinstance(metadata, Metadata):
self._metadata = metadata
else:
raise TypeError("Expected 'None' or 'Metadata'.")
[docs] def bucket_to_explicit(self) -> 'Embeddings':
"""
Convert bucket embeddings to embeddings with explicit lookup.
Multiple embeddings can still map to the same bucket, but all buckets that are not
indexed by in-vocabulary n-grams are eliminated. This can have a big impact on the
size of the embedding matrix.
A side effect of this method is the conversion from a quantized storage to an
array storage.
Returns
-------
embeddings : Embeddings
Embeddings with an ExplicitVocab instead of a hash-based vocabulary.
Raises
------
TypeError
If the current vocabulary is not a hash-based vocabulary
(FinalfusionBucketVocab or FastTextVocab)
"""
bucket_vocabs = (FastTextVocab, FinalfusionBucketVocab)
if not isinstance(self._vocab, bucket_vocabs):
raise TypeError(
"Only bucketed embeddings can be converted to explicit.")
vocab = self._vocab.to_explicit()
storage = np.zeros((vocab.idx_bound, self._storage.shape[1]),
dtype=np.float32)
storage[:len(vocab)] = self._storage[:len(vocab)]
for ngram in vocab.subword_indexer:
storage[len(vocab) + vocab.subword_indexer(ngram)] = self._storage[
len(vocab) + self._vocab.subword_indexer(ngram)]
return Embeddings(vocab=vocab, storage=NdArray(storage))
[docs] def chunks(self) -> List[Chunk]:
"""
Get the Embeddings Chunks as a list.
The Chunks are ordered in the expected serialization order:
1. Metadata
2. Vocabulary
3. Storage
4. Norms
Returns
-------
chunks : List[Chunk]
List of embeddings chunks.
"""
chunks = []
if self._metadata is not None:
chunks.append(self.metadata)
chunks.append(self.vocab)
chunks.append(self.storage)
if self._norms is not None:
chunks.append(self.norms)
return chunks
[docs] def write(self, file: str):
"""
Write the Embeddings to the given file.
Writes the Embeddings to a finalfusion file at the given file.
Parameters
----------
file : str
Path of the output file.
"""
with open(file, 'wb') as outf:
chunks = self.chunks()
header = Header([chunk.chunk_identifier() for chunk in chunks])
header.write_chunk(outf)
for chunk in chunks:
chunk.write_chunk(outf)
def __contains__(self, item):
return item in self._vocab
def __iter__(self):
if self._norms is not None:
return zip(self._vocab.words, self._storage, self._norms)
return zip(self._vocab.words, self._storage)
def _embedding(self,
idx: Union[int, List[int]],
out: Optional[np.ndarray] = None
) -> Tuple[np.ndarray, Optional[float]]:
res = self._storage[idx]
if res.ndim == 1:
if out is not None:
out[:] = res
else:
out = res
if self._norms is not None:
norm = self._norms[idx]
else:
norm = None
else:
out = np.add.reduce(res, 0, out=out, keepdims=False)
norm = np.linalg.norm(out)
out /= norm
return out, norm
@staticmethod
def _check_requirements(storage: Storage, vocab: Vocab,
norms: Optional[Norms],
metadata: Optional[Metadata]):
assert isinstance(storage, Storage),\
"storage is required to be a Storage"
assert isinstance(vocab, Vocab), "vocab is required to be a Vocab"
assert storage.shape[0] == vocab.idx_bound,\
"Number of embeddings needs to be equal to vocab's idx_bound"
if norms is not None:
Embeddings._norms_compat(storage, vocab, norms)
assert metadata is None or isinstance(metadata, Metadata),\
"metadata is required to be Metadata"
@staticmethod
def _norms_compat(storage: Storage, vocab: Vocab, norms: Norms):
assert isinstance(norms, Norms), "norms are required to be Norms"
assert storage.shape[0] >= len(norms),\
"Number of embeddings needs to be greater than or equal to number of norms."
assert len(vocab) == len(norms),\
"Vocab length needs to be equal to number of norms."
[docs]def load_finalfusion(file: Union[str, bytes, int, PathLike],
mmap: bool = False) -> Embeddings:
"""
Read embeddings from a file in finalfusion format.
Parameters
----------
file : str, bytes, int, PathLike
Path to a file with embeddings in finalfusoin format.
mmap : bool
Toggles memory mapping the storage buffer.
Returns
-------
embeddings : Embeddings
The embeddings from the input file.
"""
with open(file, 'rb') as inf:
_ = Header.read_chunk(inf)
chunk_id, _ = _read_chunk_header(inf)
norms = None
metadata = None
if chunk_id == ChunkIdentifier.Metadata:
metadata = Metadata.read_chunk(inf)
chunk_id, _ = _read_chunk_header(inf)
if chunk_id.is_vocab():
vocab = _VOCAB_READERS[chunk_id](inf)
else:
raise FinalfusionFormatError(
f'Expected vocab chunk, not {str(chunk_id)}')
chunk_id, _ = _read_chunk_header(inf)
if chunk_id.is_storage():
storage = _STORAGE_READERS[chunk_id](inf, mmap)
else:
raise FinalfusionFormatError(
f'Expected vocab chunk, not {str(chunk_id)}')
chunk_id = _read_chunk_header(inf)
if chunk_id is not None:
if chunk_id[0] == ChunkIdentifier.NdNorms:
norms = Norms.read_chunk(inf)
else:
raise FinalfusionFormatError(
f'Expected vocab chunk, not {str(chunk_id)}')
return Embeddings(storage=storage,
vocab=vocab,
norms=norms,
metadata=metadata)
[docs]def load_word2vec(file: Union[str, bytes, int, PathLike]) -> Embeddings:
"""
Read embeddings in word2vec binary format.
Files are expected to start with a line containing rows and cols in utf-8. Words are encoded
in utf-8 followed by a single whitespace. After the whitespace the embedding components are
expected as little-endian float32.
Parameters
----------
file : str, bytes, int, PathLike
Path to a file with embeddings in word2vec binary format.
Returns
-------
embeddings : Embeddings
The embeddings from the input file.
"""
words = []
with open(file, 'rb') as inf:
rows, cols = map(int, inf.readline().decode("utf-8").split())
matrix = np.zeros((rows, cols), dtype=np.float32)
for row in range(rows):
word = []
while True:
byte = inf.read(1)
if byte == b' ':
break
if byte == b'':
raise EOFError
if byte != b'\n':
word.append(byte)
word = b''.join(word).decode('utf-8')
words.append(word)
vec = inf.read(cols * matrix.itemsize)
matrix[row] = np.frombuffer(vec, dtype=np.float32)
norms = np.linalg.norm(matrix, axis=1)
matrix /= np.expand_dims(norms, axis=1)
return Embeddings(storage=NdArray(matrix),
norms=Norms(norms),
vocab=SimpleVocab(words))
[docs]def load_textdims(file: Union[str, bytes, int, PathLike]) -> Embeddings:
"""
Read emebddings in textdims format.
The first line contains whitespace separated rows and cols, the rest of the file contains
whitespace separated word and vector components.
Parameters
----------
file : str, bytes, int, PathLike
Path to a file with embeddings in word2vec binary format.
Returns
-------
embeddings : Embeddings
The embeddings from the input file.
"""
words = []
with open(file) as inf:
rows, cols = next(inf).split()
matrix = np.zeros((int(rows), int(cols)), dtype=np.float32)
for i, line in enumerate(inf):
line = line.strip().split()
words.append(line[0])
matrix[i] = line[1:]
norms = np.linalg.norm(matrix, axis=1)
matrix /= np.expand_dims(norms, axis=1)
return Embeddings(storage=NdArray(matrix),
norms=Norms(norms),
vocab=SimpleVocab(words))
[docs]def load_text(file: Union[str, bytes, int, PathLike]) -> Embeddings:
"""
Read embeddings in text format.
Parameters
----------
file : str, bytes, int, PathLike
Path to a file with embeddings in word2vec binary format.
Returns
-------
embeddings : Embeddings
Embeddings from the input file. The resulting Embeddings will have a
SimpleVocab, NdArray and Norms.
"""
words = []
vecs = []
with open(file) as inf:
for line in inf:
line = line.strip().split()
words.append(line[0])
vecs.append(line[1:])
matrix = np.array(vecs, dtype=np.float32)
norms = np.linalg.norm(matrix, axis=1)
matrix /= np.expand_dims(norms, axis=1)
return Embeddings(storage=NdArray(matrix),
norms=Norms(norms),
vocab=SimpleVocab(words))
[docs]def load_fastText(file: Union[str, bytes, int, PathLike]) -> Embeddings: # pylint: disable=invalid-name
"""
Read embeddings from a file in fastText format.
Parameters
----------
file : str, bytes, int, PathLike
Path to a file with embeddings in word2vec binary format.
Returns
-------
embeddings : Embeddings
The embeddings from the input file.
"""
with open(file, 'rb') as inf:
_read_ft_header(inf)
metadata = _read_ft_cfg(inf)
vocab = _read_ft_vocab(inf, metadata['buckets'], metadata['min_n'],
metadata['max_n'])
quantized = _read_binary(inf, "<B")[0]
if quantized:
raise NotImplementedError(
"Quantized storage is not supported for fastText models")
rows, cols = _read_binary(inf, "<QQ")
matrix = np.fromfile(file=inf, count=rows * cols, dtype=np.float32)
matrix = np.reshape(matrix, (rows, cols))
for i, word in enumerate(vocab):
indices = [i] + vocab.subword_indices(word)
matrix[i] = matrix[indices].mean(0, keepdims=False)
norms = np.linalg.norm(matrix[:len(vocab)], axis=1)
matrix[:len(vocab)] /= np.expand_dims(norms, axis=1)
storage = NdArray(matrix)
norms = Norms(norms)
return Embeddings(storage, vocab, norms, metadata)
def _read_ft_header(file: BinaryIO):
magic, version = _read_binary(file, "<II")
if magic != 793_712_314:
raise ValueError(f"Magic should be 793_712_314, not: {magic}")
if version > 12:
raise ValueError(f"Expected version 12, not: {version}")
def _read_ft_cfg(file: BinaryIO) -> Metadata:
cfg = _read_binary(file, "<12Id")
loss, model = cfg[6:8] # map to string
if loss == 1:
loss = 'HierarchicalSoftmax'
elif loss == 2:
loss = 'NegativeSampling'
elif loss == 3:
loss = 'Softmax'
if model == 1:
model = 'CBOW'
elif model == 2:
model = 'SkipGram'
elif model == 3:
model = 'Supervised'
metadata = Metadata({
'dims': cfg[0],
'window_size': cfg[1],
'epoch': cfg[2],
'min_count': cfg[3],
'ns': cfg[4],
'word_ngrams': cfg[5],
'loss': loss,
'model': model,
'buckets': cfg[8],
'min_n': cfg[9],
'max_n': cfg[10],
'lr_update_rate': cfg[11],
'sampling_threshold': cfg[12],
})
return metadata
def _read_ft_vocab(file: BinaryIO, buckets: int, min_n: int,
max_n: int) -> FastTextVocab:
vocab_size, _, n_labels = _read_binary(file, "<III") # discard n_words
if n_labels:
raise NotImplementedError(
"fastText prediction models are not supported")
_, prune_idx_size = _read_binary(file, "<Qq") # discard n_tokens
if prune_idx_size > 0:
raise NotImplementedError("Pruned vocabs are not supported")
words = []
for _ in range(vocab_size):
word = bytearray()
while True:
byte = file.read(1)
if byte == b'\x00':
words.append(word.decode("utf8"))
break
if byte == b'':
raise EOFError
word.extend(byte)
_ = _read_binary(file, "<Q") # discard frequency
entry_type = _read_binary(file, "<B")[0]
if entry_type != 0:
raise ValueError("Non word entry", word)
indexer = FastTextIndexer(buckets, min_n, max_n)
return FastTextVocab(words, indexer)
_VOCAB_READERS = {
ChunkIdentifier.SimpleVocab: SimpleVocab.read_chunk,
ChunkIdentifier.BucketSubwordVocab: FinalfusionBucketVocab.read_chunk,
ChunkIdentifier.FastTextSubwordVocab: FastTextVocab.read_chunk,
ChunkIdentifier.ExplicitSubwordVocab: ExplicitVocab.read_chunk,
}
_STORAGE_READERS = {
ChunkIdentifier.NdArray: NdArray.load,
ChunkIdentifier.QuantizedArray: QuantizedArray.load,
}
__all__ = [
'Embeddings', 'load_finalfusion', 'load_fastText', 'load_word2vec',
'load_textdims', 'load_text'
]