Source code for cblearn.cluster._comparison_hc

""" ComparisonHC clustering algorithm (Ghoshdastidar, Perrot and Luxburg, 2019)

This implementation is an adaptation of Michael Perrot's library code
with his permission. https://github.com/mperrot/ComparisonHC


Copyright (c) 2022 Alexander Conzelmann, David-Elias Künstle, Michael Perrot

Licensed under the MIT license: https://opensource.org/licenses/MIT
Permission is granted to use, copy, modify, and redistribute the work.
Full license information available in the project LICENSE file.
"""
from typing import List
import sparse
import numpy as np
from sklearn.base import BaseEstimator, ClusterMixin

from cblearn import utils


def flatten(l: List[list]) -> list:
    # sum can essentially be used as a mapReduce / flatMap ;)
    return sum(l, [])


def clusters_from_dendrogram(dendrogram, clusters, k):
    """Cuts a dendrogram of the initial clusters to obtain a partition of
    the space in exactly k clusters.

    If k is higher than the number of initial clusters, the
    initial clusters are returned.

    Args:
        dendrogram: numpy array, shape (n_clusters-1, 3)
                     The dendrogram that should be used to obtain the partition.
        clusters: list of (list of examples)
                  The initial clusters of the dendrogram.
        k: int, The number of clusters in the partition.

    Returns:
        k_clusters: list of (list of examples)
                    The k_clusters that are merged last in the dendrogram.

    """
    n_clusters = len(clusters)
    if k >= n_clusters:
        return clusters

    if k < 2:
        return [[example_i for cluster in clusters for example_i in cluster]]

    k_clusters = [[example_i for example_i in cluster] for cluster in clusters]

    clusters_indices = list(range(n_clusters))
    dendrogram = dendrogram.astype(int)  # with floats, the index-search below fails
    for it in range(n_clusters-k):
        i = clusters_indices.index(dendrogram[it, 0])
        j = clusters_indices.index(dendrogram[it, 1])

        if i > j:
            i, j = j, i

        k_clusters[i].extend(k_clusters[j])
        del k_clusters[j]

        clusters_indices[i] = n_clusters+it
        del clusters_indices[j]

    return k_clusters


def closest_clusters(clusters, comparisons, n_objects: int):
    """Returns the indices of the two clusters that are closest to each
    other in the list.

    Given a list of clusters, this method is deterministic.

    Args: 
        clusters : A list containing at least two clusters.
        comparisons: Quadruplets in sparse format
        n_objects: number of examples in clusters/comparisons

    Returns:
        i: The index of the first of the two closest clusters.  
        j: The index of the second of the two closest clusters.

    """
    n_clusters = len(clusters)

    i, j = None, None

    score_best = -1

    # Prepare the normalization array
    # This is the divisor for each entry in the sum.
    # It depends on the cluster of each example.
    normalization = np.zeros((1, 1, n_objects, n_objects))
    for r in range(n_clusters):
        normalization[0, 0, [np.array(clusters[r]).reshape(-1, 1)],
                      np.isin(np.arange(n_objects), clusters[r], invert=True)] = 1/len(clusters[r])

    for s in range(n_clusters):
        normalization[0, 0, :, clusters[s]] /= len(clusters[s])

    for p in range(n_clusters):
        clusters_p = clusters[p]

        comparisons_p = comparisons[clusters_p, :, :, :]

        n_examples_p = len(clusters_p)

        for q in range(p+1, n_clusters):
            score = 0

            clusters_q = clusters[q]

            comparisons_pq = comparisons_p[:, clusters_q, :, :]

            n_examples_pq = n_examples_p*len(clusters_q)

            # Divide each entry in the matrix by the normalization and sum everything
            score = np.sum(comparisons_pq*normalization)/(n_clusters*(n_clusters-1)*n_examples_pq)

            if score > score_best:
                i, j = p, q
                score_best = score

    return i, j


[docs] class ComparisonHC(ClusterMixin, BaseEstimator): """ComparisonHC. ComparisonHC [1]_ is an hierarchical clustering algorithm that calculates clusters on triplet data without computing an intermediate embedding. This is done via an adapted linkage algorithm that only uses the triplet information. As this is algorithm produces its clusterings via a Dendrogram that is created on the whole dataset, we do not provide a fit method. Call fit_predict directly with the complete dataset you want to do an clustering on. Keep in mind that this algorithm was optimized and developed for hierarchical clustering, and simply adapted to produce a flat clustering with the desired number of clusters. Thus, this algorithm might not have optimal performance in these settings when compared to other approaches. Attributes: dendrogram_: numpy array, shape (n_clusters-1, 4) An array corresponding to the learned dendrogram. After iteration i, dendrogram[i,0] and dendrogram[i,1] are the indices of the merged clusters, and dendrogram[i,2] is the size of the new cluster. The dendrogram is initialized to None until the fit method is called. The last column is set to 0 (implemented like this by the original algorithm). cluster_: list of list Initial cluster information used for fitting. Examples: >>> from sklearn.datasets import make_blobs >>> from sklearn.metrics import normalized_mutual_info_score >>> from cblearn.datasets import make_random_triplets >>> from cblearn.cluster import ComparisonHC >>> import numpy as np >>> means = np.array([[1,0], [-1, 0]]) >>> stds = 0.2 * np.ones(means.shape) >>> xs, ys = make_blobs(n_samples=[10, 10], centers=means, cluster_std=stds, ... n_features=2, random_state=2) >>> estimator = ComparisonHC(2) >>> t = make_random_triplets(xs, result_format="list-order", size=5000, random_state=2) >>> labels = estimator.fit_predict(t) >>> normalized_mutual_info_score(labels, ys) 1.0 References ---------- .. [1] Ghoshdastidar, D., Perrot, M., von Luxburg, U. (2019). Foundations of Comparison-Based Hierarchical Clustering. Advances in Neural Information Processing Systems 32. """
[docs] def __init__(self, n_clusters: int) -> None: """ Initialize the estimator. Args: n_clusters: Number of clusters desired in the final clustering. """ self.n_clusters = n_clusters
def _triplets_to_quadruplets(self, X, y=None): """ ComparisonHC expects the sparse quadruplet representation, where to every entry (1) the inverse (-1), too. E.g. if quad[0, 5, 4, 6] == 1, then quad[4, 6, 0, 5] == -1 """ triplets = utils.check_query_response(X, y, result_format='tensor-count') triplets = triplets.clip(-1, 1) # remove repeated triplets quads = sparse.COO(triplets.coords[[0, 1, 0, 2], :], triplets.data, (len(triplets),) * 4) quads = quads + sparse.COO(triplets.coords[[0, 2, 0, 1], :], -1 * triplets.data, (len(triplets),) * 4) return quads def _fit_dendrogram(self, init_clusters, quadruplets, n_objects): n_clusters = len(init_clusters) # != self.n_clusters, which is number of predicted clusters clusters_indices = list(range(n_clusters)) clusters_copy = [[example for example in cluster] for cluster in init_clusters] dendrogram = np.zeros((n_clusters - 1, 4)) for it in range(n_clusters - 1): i, j = closest_clusters(clusters_copy, quadruplets, n_objects) if i > j: i, j = j, i clusters_copy[i].extend(clusters_copy[j]) del clusters_copy[j] dendrogram[it, 0] = clusters_indices[i] dendrogram[it, 1] = clusters_indices[j] dendrogram[it, 2] = len(clusters_copy[i]) clusters_indices[i] = n_clusters + it del clusters_indices[j] return dendrogram
[docs] def fit(self, X, y=None, init_clusters=None): """Computes the dendrogram of a list of clusters. Args: X: Triplets, repeated responses will be ignored (majority vote) y: optional responses init_clusters: list of (list of examples), len(n_clusters) An optional list containing the initial clusters (list of examples). Returns: self: object Raises: ValueError: If the initial partition has less that n_examples. """ quads = self._triplets_to_quadruplets(X, y) n_objects = len(quads) if init_clusters is None: init_clusters = [[i] for i in range(n_objects)] self.clusters_ = init_clusters self.dendrogram_ = self._fit_dendrogram(init_clusters, quads, n_objects) clusters = clusters_from_dendrogram( self.dendrogram_, self.clusters_, self.n_clusters) labels_in_order = flatten([[i] * len(cluster) for i, cluster in enumerate(clusters)]) labels_for_original = [-1] * len(labels_in_order) for lab, pos in zip(labels_in_order, flatten(clusters)): labels_for_original[pos] = lab self.labels_ = np.array(labels_for_original) return self