Files
cult-scraper/apps/cluster_map/clustering.py
2025-08-11 02:37:21 +01:00

100 lines
4.3 KiB
Python

"""
Clustering algorithms and evaluation metrics.
"""
import numpy as np
import streamlit as st
from sklearn.cluster import SpectralClustering, AgglomerativeClustering, OPTICS
from sklearn.mixture import GaussianMixture
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import silhouette_score, calinski_harabasz_score
import hdbscan
from config import DEFAULT_RANDOM_STATE
def apply_clustering(embeddings, clustering_method="None", n_clusters=5):
"""
Apply clustering algorithm to embeddings and return labels and metrics.
Args:
embeddings: High-dimensional embeddings to cluster
clustering_method: Name of clustering algorithm
n_clusters: Number of clusters (for methods that require it)
Returns:
tuple: (cluster_labels, silhouette_score, calinski_harabasz_score)
"""
if clustering_method == "None" or len(embeddings) <= n_clusters:
return None, None, None
# Standardize embeddings for better clustering
scaler = StandardScaler()
scaled_embeddings = scaler.fit_transform(embeddings)
cluster_labels = None
silhouette_avg = None
calinski_harabasz = None
try:
if clustering_method == "HDBSCAN":
min_cluster_size = max(2, len(embeddings) // 20) # Adaptive min cluster size
clusterer = hdbscan.HDBSCAN(min_cluster_size=min_cluster_size,
min_samples=1, cluster_selection_epsilon=0.5)
cluster_labels = clusterer.fit_predict(scaled_embeddings)
elif clustering_method == "Spectral Clustering":
clusterer = SpectralClustering(n_clusters=n_clusters, random_state=DEFAULT_RANDOM_STATE,
affinity='rbf', gamma=1.0)
cluster_labels = clusterer.fit_predict(scaled_embeddings)
elif clustering_method == "Gaussian Mixture":
clusterer = GaussianMixture(n_components=n_clusters, random_state=DEFAULT_RANDOM_STATE,
covariance_type='full', max_iter=200)
cluster_labels = clusterer.fit_predict(scaled_embeddings)
elif clustering_method == "Agglomerative (Ward)":
clusterer = AgglomerativeClustering(n_clusters=n_clusters, linkage='ward')
cluster_labels = clusterer.fit_predict(scaled_embeddings)
elif clustering_method == "Agglomerative (Complete)":
clusterer = AgglomerativeClustering(n_clusters=n_clusters, linkage='complete')
cluster_labels = clusterer.fit_predict(scaled_embeddings)
elif clustering_method == "OPTICS":
min_samples = max(2, len(embeddings) // 50)
clusterer = OPTICS(min_samples=min_samples, xi=0.05, min_cluster_size=0.1)
cluster_labels = clusterer.fit_predict(scaled_embeddings)
# Calculate clustering quality metrics
if cluster_labels is not None and len(np.unique(cluster_labels)) > 1:
# Only calculate if we have multiple clusters and no noise-only clustering
valid_labels = cluster_labels[cluster_labels != -1] # Remove noise points for HDBSCAN/OPTICS
valid_embeddings = scaled_embeddings[cluster_labels != -1]
if len(valid_labels) > 0 and len(np.unique(valid_labels)) > 1:
silhouette_avg = silhouette_score(valid_embeddings, valid_labels)
calinski_harabasz = calinski_harabasz_score(valid_embeddings, valid_labels)
except Exception as e:
st.warning(f"Clustering failed: {str(e)}")
cluster_labels = None
return cluster_labels, silhouette_avg, calinski_harabasz
def get_cluster_statistics(cluster_labels):
"""Get basic statistics about clustering results"""
if cluster_labels is None:
return {}
unique_clusters = np.unique(cluster_labels)
n_clusters = len(unique_clusters[unique_clusters != -1]) # Exclude noise cluster (-1)
n_noise = np.sum(cluster_labels == -1)
return {
"n_clusters": n_clusters,
"n_noise_points": n_noise,
"cluster_distribution": np.bincount(cluster_labels[cluster_labels != -1]) if n_clusters > 0 else [],
"unique_clusters": unique_clusters
}