Niraj Sonje 60c9a76cff Task 1
2023-11-26 22:05:40 -07:00

318 lines
10 KiB
Python

# All imports
# Math
import math
import random
import cv2
import numpy as np
from scipy.stats import pearsonr
from collections import defaultdict
# from scipy.sparse.linalg import svds
# from sklearn.decomposition import NMF
from sklearn.decomposition import LatentDirichletAllocation
# from sklearn.cluster import KMeans
# Torch
import torch
import torchvision.transforms as transforms
from torchvision.datasets import Caltech101
from torchvision.models import resnet50, ResNet50_Weights
import tensorly as tl
import heapq
# OS and env
import json
import os
from os import getenv
from dotenv import load_dotenv
import warnings
from joblib import dump, load
load_dotenv()
# MongoDB
from pymongo import MongoClient
# Visualizing
import matplotlib.pyplot as plt
NUM_LABELS = 101
NUM_IMAGES = 4338
valid_classification_methods = {
"m-nn": 1,
"decision-tree": 2,
"ppr": 3,
}
def getCollection(db, collection):
"""Load feature descriptor collection from MongoDB"""
client = MongoClient("mongodb://localhost:27017")
return client[db][collection]
def euclidean_distance_measure(img_1_fd, img_2_fd):
img_1_fd_reshaped = img_1_fd.flatten()
img_2_fd_reshaped = img_2_fd.flatten()
# Calculate Euclidean distance
return math.dist(img_1_fd_reshaped, img_2_fd_reshaped)
valid_feature_models = {
"cm": "cm_fd",
"hog": "hog_fd",
"avgpool": "avgpool_fd",
"layer3": "layer3_fd",
"fc": "fc_fd",
"resnet": "resnet_fd",
}
class Node:
def __init__(self, feature=None, threshold=None, left=None, right=None, value=None):
self.feature = feature
self.threshold = threshold
self.left = left
self.right = right
self.value = value
class DecisionTree:
def __init__(self, max_depth=None):
self.max_depth = max_depth
self.tree = None
def entropy(self, y):
_, counts = np.unique(y, return_counts=True)
probabilities = counts / len(y)
return -np.sum(probabilities * np.log2(probabilities))
def information_gain(self, X, y, feature, threshold):
left_idxs = X[:, feature] <= threshold
right_idxs = ~left_idxs
left_y = y[left_idxs]
right_y = y[right_idxs]
p_left = len(left_y) / len(y)
p_right = len(right_y) / len(y)
gain = self.entropy(y) - (p_left * self.entropy(left_y) + p_right * self.entropy(right_y))
return gain
def find_best_split(self, X, y):
best_gain = 0
best_feature = None
best_threshold = None
for feature in range(X.shape[1]):
thresholds = np.unique(X[:, feature])
for threshold in thresholds:
gain = self.information_gain(X, y, feature, threshold)
if gain > best_gain:
best_gain = gain
best_feature = feature
best_threshold = threshold
return best_feature, best_threshold
def build_tree(self, X, y, depth=0):
if len(np.unique(y)) == 1 or depth == self.max_depth:
return Node(value=np.argmax(np.bincount(y)))
best_feature, best_threshold = self.find_best_split(X, y)
if best_feature is None:
return Node(value=np.argmax(np.bincount(y)))
left_idxs = X[:, best_feature] <= best_threshold
right_idxs = ~left_idxs
left_subtree = self.build_tree(X[left_idxs], y[left_idxs], depth + 1)
right_subtree = self.build_tree(X[right_idxs], y[right_idxs], depth + 1)
return Node(feature=best_feature, threshold=best_threshold, left=left_subtree, right=right_subtree)
def fit(self, X, y):
X = np.array(X) # Convert to NumPy array
y = np.array(y) # Convert to NumPy array
self.tree = self.build_tree(X, y)
def predict_instance(self, x, node):
if node.value is not None:
return node.value
if x[node.feature] <= node.threshold:
return self.predict_instance(x, node.left)
else:
return self.predict_instance(x, node.right)
def predict(self, X):
X = np.array(X) # Convert to NumPy array
predictions = []
for x in X:
pred = self.predict_instance(x, self.tree)
predictions.append(pred)
return np.array(predictions)
class LSHIndex:
def __init__(self, num_layers, num_hashes, dimensions, seed=42):
self.num_layers = num_layers
self.num_hashes = num_hashes
self.dimensions = dimensions
self.index = [defaultdict(list) for _ in range(num_layers)]
self.hash_functions = self._generate_hash_functions(seed)
def _generate_hash_functions(self, seed):
np.random.seed(seed)
hash_functions = []
for _ in range(self.num_layers):
layer_hashes = []
for _ in range(self.num_hashes):
random_projection = np.random.randn(self.dimensions)
random_projection /= np.linalg.norm(random_projection)
layer_hashes.append(random_projection)
hash_functions.append(layer_hashes)
return hash_functions
def hash_vector(self, vector):
hashed_values = []
for i in range(self.num_layers):
layer_hashes = self.hash_functions[i]
layer_hash = [int(np.dot(vector, h) > 0) for h in layer_hashes]
hashed_values.append(tuple(layer_hash))
return hashed_values
def add_vector(self, vector, image_id):
hashed = self.hash_vector(vector)
for i in range(self.num_layers):
self.index[i][hashed[i]].append((image_id, vector))
def query(self, query_vector):
hashed_query = self.hash_vector(query_vector)
candidates = set()
for i in range(self.num_layers):
candidates.update(self.index[i][hashed_query[i]])
return candidates
def query_t_unique(self, query_vector, t):
hashed_query = self.hash_vector(query_vector)
candidates = []
unique_vectors = set() # Track unique vectors considered
for i in range(self.num_layers):
candidates.extend(self.index[i][hashed_query[i]])
# Calculate Euclidean distance between query and candidate vectors
distances = []
for candidate in candidates:
unique_vectors.add(tuple(candidate[1])) # Adding vectors to track uniqueness
# unique_vectors.add((candidate)) # Adding vectors to track uniqueness
distance = np.linalg.norm(candidate[0] - query_vector)
distances.append(distance)
# Sort candidates based on Euclidean distance and get t unique similar vectors
unique_similar_vectors = []
for distance, candidate in sorted(zip(distances, candidates)):
if len(unique_similar_vectors) >= t:
break
if tuple(candidate) not in unique_similar_vectors:
unique_similar_vectors.append(tuple(candidate))
return list(unique_similar_vectors), len(unique_vectors), len(candidates)
def extract_latent_semantics_from_feature_model(
fd_collection,
k,
feature_model,
):
"""
Extract latent semantics for entire collection at once for a given feature_model and dim_reduction_method, and display the imageID-semantic weight pairs
Leave `top_images` blank to display all imageID-weight pairs
"""
label_features = np.array([
np.array(
calculate_label_representatives(fd_collection, label, feature_model)
).flatten() # get the specific feature model's feature vector
for label in range(NUM_LABELS)
# repeat for all images
])
print(
"Applying {} on the {} space to get {} latent semantics.".format(
"svd", feature_model, k
)
)
all_latent_semantics = {}
U, S, V_T = svd(label_features, k=k)
U = [C.real for C in U]
S = [C.real for C in S]
V_T = [C.real for C in V_T]
all_latent_semantics = {
"image-semantic": U,
"semantics-core": S,
"semantic-feature": V_T,
}
# for each latent semantic, sort imageID-weight pairs by weights in descending order
return all_latent_semantics
def calculate_label_representatives(fd_collection, label, feature_model):
"""Calculate representative feature vector of a label as the mean of all feature vectors under a feature model"""
label_fds = [
np.array(
img_fds[feature_model]
).flatten() # get the specific feature model's feature vector
for img_fds in fd_collection.find(
{"true_label": label, "$mod": [2,0]}
) # repeat for all images
]
# Calculate mean across each dimension
# and build a mean vector out of these means
label_mean_vector = [sum(col) / len(col) for col in zip(*label_fds)]
return label_mean_vector
def svd(matrix, k):
# Step 1: Compute the covariance matrix
cov_matrix = np.dot(matrix.T, matrix)
# Step 2: Compute the eigenvalues and eigenvectors of the covariance matrix
eigenvalues, eigenvectors = np.linalg.eig(cov_matrix)
# Step 3: Sort the eigenvalues and corresponding eigenvectors
sort_indices = eigenvalues.argsort()[::-1]
eigenvalues = eigenvalues[sort_indices]
eigenvectors = eigenvectors[:, sort_indices]
# Step 4: Compute the singular values and the left and right singular vectors
singular_values = np.sqrt(eigenvalues)
left_singular_vectors = np.dot(matrix, eigenvectors)
right_singular_vectors = eigenvectors
# Step 5: Normalize the singular vectors
for i in range(left_singular_vectors.shape[1]):
left_singular_vectors[:, i] /= singular_values[i]
for i in range(right_singular_vectors.shape[1]):
right_singular_vectors[:, i] /= singular_values[i]
# Keep only the top k singular values and their corresponding vectors
singular_values = singular_values[:k]
left_singular_vectors = left_singular_vectors[:, :k]
right_singular_vectors = right_singular_vectors[:, :k]
return left_singular_vectors, np.diag(singular_values), right_singular_vectors.T