from scipy.spatial import distance
from typing import Optional, Union
from sklearn.base import BaseEstimator
from sklearn.utils import check_random_state
import numpy as np
from scipy.optimize import minimize
from cblearn import utils
from cblearn.embedding._base import TripletEmbeddingMixin
from cblearn.embedding._torch_utils import assert_torch_is_available, torch_minimize
[docs]
class STE(BaseEstimator, TripletEmbeddingMixin):
""" Stochastic Triplet Embedding algorithm (STE / t-STE).
STE [1]_ maximizes the probability, that the triplets are satisfied.
The variant t-STE is using the heavy tailed Student-t-kernel instead of a Gaussian kernel.
This estimator supports multiple implementations which can be selected by the `backend` parameter.
The *scipy* backend uses the L-BSGS-B optimizer.
The *torch* backend uses the ADAM optimizer and backpropagation [2]_.
It can executed on CPU, but also CUDA GPUs.
.. note::
The *torch* backend requires the *pytorch* python package (see :ref:`extras_install`).
Attributes:
embedding_: Final embedding, shape (n_objects, n_components)
stress_: Final value of the SOE stress corresponding to the embedding.
n_iter_: Final number of optimization steps.
Examples:
>>> from cblearn import datasets
>>> seed = np.random.RandomState(42)
>>> true_embedding = seed.rand(15, 2)
>>> triplets = datasets.make_random_triplets(true_embedding, result_format='list-order',
... size=1000, random_state=seed)
>>> triplets.shape, np.unique(triplets).shape
((1000, 3), (15,))
>>> estimator = STE(n_components=2, random_state=seed)
>>> embedding = estimator.fit_transform(triplets, n_objects=15)
>>> embedding.shape
(15, 2)
>>> estimator.score(triplets) > 0.8
True
The following is running on the CUDA GPU, if available (but requires pytorch installed).
>>> estimator = STE(n_components=2, backend="torch", random_state=seed)
>>> embedding = estimator.fit_transform(triplets, n_objects=15)
>>> estimator.score(triplets) > 0.8
True
References
----------
.. [1] van der Maaten, L., & Weinberger, K. (2012). Stochastic triplet embedding.
2012 IEEE International Workshop on Machine Learning for Signal Processing, 1–6.
.. [2] Vankadara, L. C., Haghiri, S., Lohaus, M., Wahab, F. U., & von Luxburg, U. (2020).
Insights into Ordinal Embedding Algorithms: A Systematic Evaluation. ArXiv:1912.01666 [Cs, Stat].
"""
[docs]
def __init__(self, n_components=2, heavy_tailed=False, verbose=False,
random_state: Union[None, int, np.random.RandomState] = None, max_iter=1000,
backend: str = "scipy", learning_rate=1, batch_size=50_000, device: str = "auto"):
""" Initialize the estimator.
Args:
n_components :
The dimension of the embedding.
heavy_tailed:
If false, STE is using the Gaussian kernel,
If true, t-STE is using the heavy-tailed student-t kernel.
verbose: boolean, default=False
Enable verbose output.
random_state:
The seed of the pseudo random number generator used to initialize the optimization.
max_iter:
Maximum number of optimization iterations.
backend: The backend used to optimize the objective. {"scipy", "torch"}
learning_rate: Learning rate of the gradient-based optimizer.
If None, then 100 is used, or 1 if kernel=True.
Only used with *torch* backend, else ignored.
batch_size: Batch size of stochastic optimization. Only used with the *torch* backend, else ignored.
device: The device on which pytorch computes. {"auto", "cpu", "cuda"}
"auto" chooses cuda (GPU) if available, but falls back on cpu if not.
Only used with the *torch* backend, else ignored.
"""
self.n_components = n_components
self.heavy_tailed = heavy_tailed
self.max_iter = max_iter
self.verbose = verbose
self.random_state = random_state
self.backend = backend
self.learning_rate = learning_rate
self.batch_size = batch_size
self.device = device
[docs]
def fit(self, X: utils.Query, y: np.ndarray = None, init: np.ndarray = None,
n_objects: Optional[int] = None) -> 'STE':
"""Computes the embedding.
Args:
X: The training input samples, shape (n_samples, 3)
y: Ignored
init: Initial embedding for optimization
n_objects: Number of objects in the embedding.
Returns:
self.
"""
self.fit_X_ = utils.check_query(X, result_format='list-order') # for data validation in .transform
triplets = utils.check_query_response(X, y, result_format='list-order')
self.n_features_in_ = 3
if not n_objects:
n_objects = triplets.max() + 1
random_state = check_random_state(self.random_state)
if init is None:
init = random_state.multivariate_normal(np.zeros(self.n_components), np.eye(self.n_components),
size=n_objects)
if self.backend == "torch":
assert_torch_is_available()
result = torch_minimize('adam', _ste_x_torch, init, data=(triplets.astype(int),), args=(self.heavy_tailed,),
device=self.device, max_iter=self.max_iter, lr=self.learning_rate,
seed=random_state.randint(1))
elif self.backend == "scipy":
result = minimize(_ste_x_grad, init.ravel(), args=(init.shape, triplets, self.heavy_tailed),
method='L-BFGS-B', jac=True, options=dict(maxiter=self.max_iter, disp=self.verbose))
else:
raise ValueError(f"Unknown backend '{self.backend}'. Try 'scipy' or 'torch' instead.")
if self.verbose and not result.success:
print(f"STE's optimization failed with reason: {result.message}.")
self.embedding_ = result.x.reshape(-1, self.n_components)
self.stress_, self.n_iter_ = result.fun, result.nit
return self
def _ste_x_torch(embedding, triplets, heavy_tailed, p=2.):
import torch # Pytorch is an optional dependency
X = embedding[triplets.long()]
dof = max(embedding.shape[1] - 1, 1)
I, J, K = X[:, 0, :], X[:, 1, :], X[:, 2, :]
dist_1 = torch.linalg.vector_norm(I - J, ord=p, dim=1)**2
dist_2 = torch.linalg.vector_norm(I - K, ord=p, dim=1)**2
if heavy_tailed:
t_dist_1 = (1 + dist_1 / 2.)**(-(dof + 1) / 2)
loss = t_dist_1 / (t_dist_1 + (1 + dist_2 / 2.)**(-(dof + 1) / 2) + 1e-16)
else:
loss = (-dist_1).exp() / ((-dist_1).exp() + (-dist_2).exp() + 1e-16)
return -loss.log().sum()
def _ste_x_grad(x, x_shape, triplets, heavy_tailed):
X = x.reshape(x_shape) # scipy minimize expects a flat x.
n_objects, n_dim = X.shape
dof = max(n_dim - 1, 1)
dist = distance.squareform(distance.pdist(X, 'sqeuclidean'))
if heavy_tailed:
base_kernel = 1 + dist / dof
kernel = base_kernel**(-(dof + 1) / 2)
else:
kernel = np.exp(-dist)
I, J, K = tuple(triplets.T)
P = kernel[I, J] / (kernel[I, J] + kernel[I, K] + 1e-12)
loss = -np.log(np.maximum(P, np.finfo(float).tiny)).sum()
if heavy_tailed:
base_inv = (1 / base_kernel)[..., np.newaxis]
grad_triplets = - (dof + 1) / dof * np.array([
base_inv[I, J] * (X[I] - X[J]) - base_inv[I, K] * (X[I] - X[K]),
- base_inv[I, J] * (X[I] - X[J]),
base_inv[I, K] * (X[I] - X[K])])
else:
grad_triplets = - 2 * np.array([
(X[I] - X[J]) - (X[I] - X[K]),
- (X[I] - X[J]),
(X[I] - X[K])])
grad_triplets *= (P * (1 - P))[np.newaxis, :, np.newaxis]
loss_grad = np.empty_like(X)
for dim in range(X.shape[1]):
loss_grad[:, dim] = np.bincount(triplets[:, 0], grad_triplets[0, :, dim], n_objects)
loss_grad[:, dim] += np.bincount(triplets[:, 1], grad_triplets[1, :, dim], n_objects)
loss_grad[:, dim] += np.bincount(triplets[:, 2], grad_triplets[2, :, dim], n_objects)
loss_grad = -loss_grad
return loss, loss_grad.ravel()
[docs]
class TSTE(STE):
""" t-Distributed Stochastic Triplet Embedding (t-STE)
Variant of :class:`STE`, that assumes t-student distributed distances
which leads to better optimization properties."""
[docs]
def __init__(self, n_components=2, verbose=False,
random_state: Union[None, int, np.random.RandomState] = None, max_iter=1000,
backend: str = "scipy", learning_rate=1, batch_size=50_000, device: str = "auto"):
heavy_tailed = True
return super().__init__(n_components, heavy_tailed, verbose, random_state, max_iter, backend,
learning_rate, batch_size, device)