from typing import List, Optional, Tuple, Union
import numpy as np
import pandas as pd
from implicit.als import AlternatingLeastSquares
from implicit.evaluation import AUC_at_k, precision_at_k
from scipy import sparse as sps
from .base import BaseRecommender
[docs]class IALSRecommender(BaseRecommender):
"""iALS recommender based on `implicit`.
Args:
interaction (pd.DataFrame): user-item interaction matrix.
items (Optional[pd.DataFrame]): item side information.
factors (int): the dimensions of user/item embeddings.
regularization (float): regularization coefficient.
alpha (float): the unobserved weight.
"""
def __init__(
self,
df_interaction: pd.DataFrame,
items: pd.DataFrame,
test_size: Union[float, int],
random_split: bool = False,
factors: int = 300,
regularization: float = 0.03,
alpha: float = 0.6,
iterations: int = 10,
random_state: Optional[int] = 42,
toppop_mask: Optional[np.ndarray] = None,
) -> None:
super().__init__(df_interaction, items, test_size, random_split, toppop_mask)
self.ials = AlternatingLeastSquares(
factors=factors,
regularization=regularization,
alpha=alpha,
iterations=iterations,
random_state=random_state,
calculate_training_loss=True,
)
self.train_mat = self.bm25(self.train_mat)
[docs] def bm25(self, X: sps.coo_matrix, K1: int = 100, B: float = 0.8) -> sps.csr_matrix:
r"""Weighs each col of a sparse matrix X by BM25 weighting.
Taken from `nearest_neighbours.py of implicit <https://github.com/benfred/implicit/blob/main/implicit/nearest_neighbours.py>`_
"""
X = X.T
N = float(X.shape[0])
idf = np.log(N) - np.log1p(np.bincount(X.col))
row_sums = np.ravel(X.sum(axis=1))
average_length = row_sums.mean()
length_norm = (1.0 - B) + B * row_sums / average_length
X.data = X.data * (K1 + 1.0) / (K1 * length_norm[X.row] + X.data) * idf[X.col]
return X.T.tocsr()
def _fit(self) -> None:
self.ials.fit(2 * self.train_mat)
def recommend(self, user_ids: List[int]) -> tuple:
ids, scores = self.ials.recommend(
user_ids, self.train_mat[user_ids], N=self.n_items
)
id_list: List = [list(id) for id in ids]
return (id_list, scores)
[docs] def recommend_single(self, user_string: str, top_k: int = 100) -> List:
"""Recommend for single user with `top_k` items.
Args:
user_string (str): the original token string for user.
top_k (int, optional): `top_k` items to be recommended. Defaults to 100.
Returns:
List: a list of recommended item ids.
"""
if user_string in self.user_array:
user_id = self.get_user_id(user_string)
ids, _ = self.ials.recommend(user_id, self.train_mat[user_id], N=top_k)
indices = np.asarray(ids)
else:
rank, _ = self.toppop
indices = rank[:top_k]
return [self.item_array[index] for index in indices]
def auc_score(self, top_k: int = 100) -> float:
return float(AUC_at_k(self.ials, self.train_mat, self.test_mat, K=top_k))
def precision_at_top_k(self, top_k: int = 100) -> float:
return float(precision_at_k(self.ials, self.train_mat, self.test_mat, K=top_k))
def get_item_factors(self) -> np.ndarray:
return np.asarray(self.ials.item_factors)
def get_user_factors(self) -> np.ndarray:
return np.asarray(self.ials.user_factors)
def mask_items(self, keep_row: np.ndarray) -> None:
mask = np.ones(self.ials.item_factors.shape[0], dtype=bool)
mask[keep_row] = False
self.ials.item_factors[mask] = 0
[docs] def get_score_single_user(
self, user_string: str, keep_indices: np.ndarray
) -> Optional[np.ndarray]:
"""Get the single user's predictions scores for the filtered items.
Return `None` for new users.
Args:
user_string (str): Original user token string.
keep_indices (np.ndarray): Items to be kept based on filters.
Returns:
Optional[np.ndarray]:
Predictions for the given items.
Return `None` for new users.
"""
user_id = self.get_user_id(user_string)
if user_id:
user_factor = self.get_user_factors()[user_id]
item_factors = self.get_item_factors()[keep_indices]
scores = np.asarray(user_factor @ item_factors.T)
return scores
else:
return None
[docs] def get_topk_single_user(
self,
user_string: str,
keep_indices: np.ndarray,
top_k: int,
) -> Tuple[np.ndarray, np.ndarray]:
"""Get the recommended item ids for a given user id.
Args:
user_string (str): User id string.
keep_indices (np.ndarray): Indices for items to be kept.
top_k (int): Top-k items to be recommended.
Returns:
Tuple[np.ndarray, np.ndarray]:
Recommended items ids and the corresponding scores.
Return `toppop` for new users.
"""
scores = self.get_score_single_user(user_string, keep_indices)
if scores is None:
indices, scores = self._get_toppop(keep_indices)
return (indices[:top_k], scores[:top_k])
else:
rank = self.get_topk_indices(scores, top_k)
indices = keep_indices[rank]
return (self.item_array[indices], scores[rank])
def predict(
self,
user_ids: np.ndarray,
item_ids: np.ndarray,
user_features: Optional[sps.csr_matrix] = None,
item_features: Optional[sps.csr_matrix] = None,
) -> np.ndarray:
user_factors = self.ials.user_factors[user_ids]
item_factors = self.ials.item_factors[item_ids]
predict_array: np.ndarray = np.asarray(
[user @ item for user, item in zip(user_factors, item_factors)]
)
return predict_array
def rerank_preprocess(
self, user_id: int, truncate_at: int, category_col: str, embedding_col: str
) -> Tuple:
item_clean = self.clean_items()
category = item_clean[category_col].to_list()
embedding = np.stack(item_clean[embedding_col])
org_rank = self.recommend([user_id])[0][0]
org_scores = self.recommend([user_id])[1][0]
relevance_scores = org_scores[:truncate_at]
org_select = org_rank[:truncate_at]
similarity_scores = embedding[org_select]
similarity_matrix = similarity_scores @ similarity_scores.T
return (org_select, category, relevance_scores, similarity_matrix)