212 lines
8.8 KiB
Python
212 lines
8.8 KiB
Python
"""
|
|
Dimensionality reduction algorithms and point separation techniques.
|
|
"""
|
|
|
|
import numpy as np
|
|
import streamlit as st
|
|
from sklearn.decomposition import PCA
|
|
from sklearn.manifold import TSNE, SpectralEmbedding
|
|
from sklearn.preprocessing import StandardScaler
|
|
from sklearn.neighbors import NearestNeighbors
|
|
from scipy.spatial.distance import pdist, squareform
|
|
from scipy.optimize import minimize
|
|
import umap
|
|
from config import DEFAULT_RANDOM_STATE
|
|
|
|
|
|
def apply_adaptive_spreading(embeddings, spread_factor=1.0):
|
|
"""
|
|
Apply adaptive spreading to push apart nearby points while preserving global structure.
|
|
Uses a force-based approach where closer points repel more strongly.
|
|
"""
|
|
if spread_factor <= 0:
|
|
return embeddings
|
|
|
|
embeddings = embeddings.copy()
|
|
n_points = len(embeddings)
|
|
|
|
print(f"DEBUG: Applying adaptive spreading to {n_points} points with factor {spread_factor}")
|
|
|
|
if n_points < 2:
|
|
return embeddings
|
|
|
|
# For very large datasets, skip spreading to avoid hanging
|
|
if n_points > 1000:
|
|
print(f"DEBUG: Large dataset ({n_points} points), skipping adaptive spreading...")
|
|
return embeddings
|
|
|
|
# Calculate pairwise distances
|
|
distances = squareform(pdist(embeddings))
|
|
|
|
# Apply force-based spreading with fewer iterations for large datasets
|
|
max_iterations = 3 if n_points > 500 else 5
|
|
|
|
for iteration in range(max_iterations):
|
|
if iteration % 2 == 0: # Progress indicator
|
|
print(f"DEBUG: Spreading iteration {iteration + 1}/{max_iterations}")
|
|
|
|
forces = np.zeros_like(embeddings)
|
|
|
|
for i in range(n_points):
|
|
for j in range(i + 1, n_points):
|
|
diff = embeddings[i] - embeddings[j]
|
|
dist = np.linalg.norm(diff)
|
|
|
|
if dist > 0:
|
|
# Repulsive force inversely proportional to distance
|
|
force_magnitude = spread_factor / (dist ** 2 + 0.01)
|
|
force_direction = diff / dist
|
|
force = force_magnitude * force_direction
|
|
|
|
forces[i] += force
|
|
forces[j] -= force
|
|
|
|
# Apply forces with damping
|
|
embeddings += forces * 0.1
|
|
|
|
print(f"DEBUG: Adaptive spreading complete")
|
|
return embeddings
|
|
|
|
|
|
def force_directed_layout(high_dim_embeddings, n_components=2, spread_factor=1.0):
|
|
"""
|
|
Create a force-directed layout from high-dimensional embeddings.
|
|
This creates more natural spacing between similar points.
|
|
"""
|
|
print(f"DEBUG: Starting force-directed layout with {len(high_dim_embeddings)} points...")
|
|
|
|
# For large datasets, fall back to PCA + spreading to avoid hanging
|
|
if len(high_dim_embeddings) > 500:
|
|
print(f"DEBUG: Large dataset ({len(high_dim_embeddings)} points), using PCA + spreading instead...")
|
|
pca = PCA(n_components=n_components, random_state=DEFAULT_RANDOM_STATE)
|
|
result = pca.fit_transform(high_dim_embeddings)
|
|
return apply_adaptive_spreading(result, spread_factor)
|
|
|
|
# Start with PCA as initial layout
|
|
pca = PCA(n_components=n_components, random_state=DEFAULT_RANDOM_STATE)
|
|
initial_layout = pca.fit_transform(high_dim_embeddings)
|
|
print(f"DEBUG: Initial PCA layout computed...")
|
|
|
|
# For simplicity, just apply spreading to the PCA result
|
|
# The original optimization was too computationally intensive
|
|
result = apply_adaptive_spreading(initial_layout, spread_factor)
|
|
print(f"DEBUG: Force-directed layout complete...")
|
|
return result
|
|
|
|
|
|
def calculate_local_density_scaling(embeddings, k=5):
|
|
"""
|
|
Calculate local density scaling factors to emphasize differences in dense regions.
|
|
"""
|
|
if len(embeddings) < k:
|
|
return np.ones(len(embeddings))
|
|
|
|
# Find k nearest neighbors for each point
|
|
nn = NearestNeighbors(n_neighbors=k+1) # +1 because first neighbor is the point itself
|
|
nn.fit(embeddings)
|
|
distances, indices = nn.kneighbors(embeddings)
|
|
|
|
# Calculate local density (inverse of average distance to k nearest neighbors)
|
|
local_densities = 1.0 / (np.mean(distances[:, 1:], axis=1) + 1e-6)
|
|
|
|
# Normalize densities
|
|
local_densities = (local_densities - np.min(local_densities)) / (np.max(local_densities) - np.min(local_densities) + 1e-6)
|
|
|
|
return local_densities
|
|
|
|
|
|
def apply_density_based_jittering(embeddings, density_scaling=True, jitter_strength=0.1):
|
|
"""
|
|
Apply smart jittering that's stronger in dense regions to separate overlapping points.
|
|
"""
|
|
if not density_scaling:
|
|
# Simple random jittering
|
|
noise = np.random.normal(0, jitter_strength, embeddings.shape)
|
|
return embeddings + noise
|
|
|
|
# Calculate local densities
|
|
densities = calculate_local_density_scaling(embeddings)
|
|
|
|
# Apply density-proportional jittering
|
|
jittered = embeddings.copy()
|
|
for i in range(len(embeddings)):
|
|
# More jitter in denser regions
|
|
jitter_amount = jitter_strength * (1 + densities[i])
|
|
noise = np.random.normal(0, jitter_amount, embeddings.shape[1])
|
|
jittered[i] += noise
|
|
|
|
return jittered
|
|
|
|
|
|
def reduce_dimensions(embeddings, method="PCA", n_components=2, spread_factor=1.0,
|
|
perplexity_factor=1.0, min_dist_factor=1.0):
|
|
"""Apply dimensionality reduction with enhanced separation"""
|
|
|
|
# Convert to numpy array if it's not already
|
|
embeddings = np.array(embeddings)
|
|
|
|
print(f"DEBUG: Starting {method} with {len(embeddings)} embeddings, shape: {embeddings.shape}")
|
|
|
|
# Standardize embeddings for better processing
|
|
scaler = StandardScaler()
|
|
scaled_embeddings = scaler.fit_transform(embeddings)
|
|
print(f"DEBUG: Embeddings standardized")
|
|
|
|
# Apply the selected dimensionality reduction method
|
|
if method == "PCA":
|
|
print(f"DEBUG: Applying PCA...")
|
|
reducer = PCA(n_components=n_components, random_state=DEFAULT_RANDOM_STATE)
|
|
reduced_embeddings = reducer.fit_transform(scaled_embeddings)
|
|
# Apply spreading to PCA results
|
|
print(f"DEBUG: Applying spreading...")
|
|
reduced_embeddings = apply_adaptive_spreading(reduced_embeddings, spread_factor)
|
|
|
|
elif method == "t-SNE":
|
|
# Adjust perplexity based on user preference and data size
|
|
base_perplexity = min(30, len(embeddings)-1)
|
|
adjusted_perplexity = max(5, min(50, int(base_perplexity * perplexity_factor)))
|
|
print(f"DEBUG: Applying t-SNE with perplexity {adjusted_perplexity}...")
|
|
|
|
reducer = TSNE(n_components=n_components, random_state=DEFAULT_RANDOM_STATE,
|
|
perplexity=adjusted_perplexity, n_iter=1000,
|
|
early_exaggeration=12.0 * spread_factor, # Increase early exaggeration for more separation
|
|
learning_rate='auto')
|
|
reduced_embeddings = reducer.fit_transform(scaled_embeddings)
|
|
|
|
elif method == "UMAP":
|
|
# Adjust UMAP parameters for better local separation
|
|
n_neighbors = min(15, len(embeddings)-1)
|
|
min_dist = 0.1 * min_dist_factor
|
|
spread = 1.0 * spread_factor
|
|
print(f"DEBUG: Applying UMAP with n_neighbors={n_neighbors}, min_dist={min_dist}...")
|
|
|
|
reducer = umap.UMAP(n_components=n_components, random_state=DEFAULT_RANDOM_STATE,
|
|
n_neighbors=n_neighbors, min_dist=min_dist,
|
|
spread=spread, local_connectivity=2.0)
|
|
reduced_embeddings = reducer.fit_transform(scaled_embeddings)
|
|
|
|
elif method == "Spectral Embedding":
|
|
n_neighbors = min(10, len(embeddings)-1)
|
|
print(f"DEBUG: Applying Spectral Embedding with n_neighbors={n_neighbors}...")
|
|
reducer = SpectralEmbedding(n_components=n_components, random_state=DEFAULT_RANDOM_STATE,
|
|
n_neighbors=n_neighbors)
|
|
reduced_embeddings = reducer.fit_transform(scaled_embeddings)
|
|
# Apply spreading to spectral results
|
|
print(f"DEBUG: Applying spreading...")
|
|
reduced_embeddings = apply_adaptive_spreading(reduced_embeddings, spread_factor)
|
|
|
|
elif method == "Force-Directed":
|
|
# New method: Use force-directed layout for natural spreading
|
|
print(f"DEBUG: Applying Force-Directed layout...")
|
|
reduced_embeddings = force_directed_layout(scaled_embeddings, n_components, spread_factor)
|
|
|
|
else:
|
|
# Fallback to PCA
|
|
print(f"DEBUG: Unknown method {method}, falling back to PCA...")
|
|
reducer = PCA(n_components=n_components, random_state=DEFAULT_RANDOM_STATE)
|
|
reduced_embeddings = reducer.fit_transform(scaled_embeddings)
|
|
reduced_embeddings = apply_adaptive_spreading(reduced_embeddings, spread_factor)
|
|
|
|
print(f"DEBUG: Dimensionality reduction complete. Output shape: {reduced_embeddings.shape}")
|
|
return reduced_embeddings
|