Python/推荐系统/main.py

936 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
脚本说明:
推荐系统,召回阶段使用基于用户、物品协同过滤算法生成候选物品列表,精排阶段使用时序建模和多头注意力机制进一步精排最终形成推荐物品列表
"""
import heapq
import time
from collections import defaultdict, deque
from itertools import islice
from operator import itemgetter
from typing import Literal
import numpy
from pyLSHash import LSHash
# 导入模块
from pydantic import BaseModel, Field, model_validator
"""
编码区域
"""
class InitializationArguments(BaseModel):
"""推荐系统初始化参数数据模型"""
# 时间窗口(单位为天),平衡实时性和运算效率
time_window: int = Field(default=30, ge=5, le=360)
# 衰减因子兰布达系数,控制兴趣分数衰减速率
decay_lambda: float = Field(default=0, ge=0.00, le=10)
# 用户特征向量维度数
attributes_dimensions: int = Field(default=10, ge=2.00, le=200)
class Attributes(BaseModel):
"""用户属性数据模型"""
# 年龄
age: int = Field(default=..., ge=1, le=99)
# 性别
gender: Literal["male", "female"] = Field(default=...)
# 市
city: str = Field(default=...)
# 职业
occupation: str = Field(default=...)
class BehaviorRecord(BaseModel):
"""用户行为记录数据模型"""
# 用户标识
user: str = Field(default=..., min_length=6, max_length=6)
# 时间戳
timestamp: int = Field(default=...)
# 行为类型
type_: Literal["rating"] = Field(default=...)
# 物品标识
item: str = Field(default=..., min_length=6, max_length=6)
# 评分
rating: int | None = Field(default=None, ge=1, le=5)
@model_validator(mode="after")
def _validate_rating(self):
if self.type_ == "rating" and not self.rating:
raise ValueError("若行为类型为评分则拼分必填")
return self
class AttributesRecord(BaseModel):
"""用户属性记录数据模型"""
# 用户标识
user: str = Field(default=..., min_length=6, max_length=6)
# 用户属性
attributes: Attributes = Field(default=...)
class RecommenderSystem:
"""基于协同过滤的推荐系统"""
def __init__(self, **arguments):
"""初始化推荐系统"""
# 行为参数配置项(根据行为类型配置兴趣基础分数和衰减权重)
self.behavior_arguments = {"rating": (None, 1)}
# 校验并解析初始化参数
arguments = InitializationArguments(**arguments).model_dump()
self.time_window = arguments.get("time_window")
self.decay_lambda = arguments.get("decay_lambda")
self.attributes_dimensions = arguments.get("attributes_dimensions")
# 用户行为数据储存对象
self.behaviors = defaultdict(
lambda: {
# 物品兴趣分数记录
"scores": defaultdict(float),
# 时间戳最小堆
"timestamps_heap": [],
# 历史物品记录
"history_items": deque(maxlen=200),
}
)
# 物品的相似度组成成分
self.items_similarity_numerator = defaultdict(float)
self.items_similarity_denominator_square = defaultdict(float)
# 物品的倒排索引
self.items_inversion = defaultdict(list)
# 用户特征向量数据储存对象
self.attributes = defaultdict(lambda: numpy.zeros(self.attributes_dimensions))
# 用户特征向量索引器基于LSHash方法
self.attributes_indexer = LSHash(
hash_size=8, input_dim=self.attributes_dimensions, num_hashtables=2
)
def process_behavior_record(self, behavior_record: dict) -> bool:
"""处理用户行为记录并更新推荐系统"""
# 校验并解析用户行为记录
behavior_record = BehaviorRecord(**behavior_record).model_dump()
user = behavior_record["user"]
timestamp = behavior_record["timestamp"]
type_ = behavior_record["type_"]
item = behavior_record["item"]
rating = behavior_record["rating"]
# 整理用户行为数据
self._reorganize_behaviors(user=user)
# 原兴趣分数
score_old = self.behaviors[user]["scores"][item]
# 现兴趣分数
score_new = self._calculate_score(
timestamp=timestamp, type_=type_, rating=rating
)
# 若现兴趣分数大于原兴趣分数则更新用户的物品兴趣分数记录、时间戳最小堆、历史物品记录和物品的相似度组成成分
if score_new > score_old:
self.behaviors[user]["scores"][item] = score_new
heapq.heappush(self.behaviors[user]["timestamps_heap"], (timestamp, item))
self.behaviors[user]["history_items"].append(item)
self._update_items_similarity_components(
user=user, item=item, score_old=score_old, score_new=score_new
)
# 若物品的倒排索引不存在该用户标识则新增
if user not in self.items_inversion[item]:
self.items_inversion[item].append(user)
return True
def _reorganize_behaviors(self, user) -> None:
"""基于时间窗口整理用户行为数据"""
# 时间戳最小堆
timestamps_heap = self.behaviors[user]["timestamps_heap"]
# 整理超过时间窗口的用户行为数据
while (
timestamps_heap
and (time.time() - timestamps_heap[0][0]) / 86400 > self.time_window
):
# 弹出最小的时间戳对应的物品标识(过期物品标识)
timestamp_expired, item_expired = heapq.heappop(timestamps_heap)
# 若用户的物品兴趣分数记录存在该过期物品标识则删除并更新物品的相似度组成成分
if item_expired in self.behaviors[user]["scores"][item_expired]:
score_expired = self.behaviors[user]["scores"][item_expired]
del self.behaviors[user]["scores"][item_expired]
self._update_items_similarity_components(
user=user, item=item_expired, score_old=score_expired, score_new=0
)
# 若用户的历史物品记录存在该过期物品标识则删除
if item_expired in self.behaviors[user]["history_items"]:
self.behaviors[user]["history_items"].remove(item_expired)
# 若物品的倒排索引存在该用户标识则删除
if user in self.items_inversion[item_expired]:
self.items_inversion[item_expired].remove(user)
# 若物品的倒排索引存为空则删除
if not self.items_inversion[item_expired]:
del self.items_inversion[item_expired]
def _calculate_score(self, timestamp, type_, rating) -> float:
"""基于时间衰减计算兴趣分数"""
# 时距(天)
time_interval = (time.time() - timestamp) / 86400
# 若时距大于时间窗口则返回0
if time_interval > self.time_window:
return 0
# 根据行为类型获取兴趣基础分数和衰减权重
score_base, weight = self.behavior_arguments[type_]
# 若行为类型为评分则将基础分数转化为0.20.8
if type_ == "rating":
score_base = 0.1 + 0.8 * (1 / (1 + numpy.exp(3 - rating)))
return score_base * numpy.exp(0 - time_interval * (self.decay_lambda * weight))
def _update_items_similarity_components(self, user, item, score_old, score_new):
"""更新物品相似度组成成分"""
for item_history in self.behaviors[user]["history_items"]:
if item_history != item:
# 构建有序物品标识对
pair = tuple(sorted((item_history, item)))
# 更新物品相似度的分子
self.items_similarity_numerator[pair] += (
score_new - score_old
) * self.behaviors[user]["scores"][item_history]
# 更新物品相似度的分母组成成分
self.items_similarity_denominator_square[item] += score_new**2 - score_old**2
def process_attributes_record(self, attributes_record: dict) -> bool:
"""处理用户属性记录"""
# 校验并解析用户属性记录
attributes_record = AttributesRecord(**attributes_record).model_dump()
user = attributes_record.get("user")
# 若用户属性值非空则更新用户属性
for key, value in attributes_record.get("attributes").items():
if value:
self.behaviors[user]["attributes"][key] = value
return True
def generate_recommendations(self, user, k: int = 10) -> dict:
"""生成TOP-K推荐列表"""
# 推荐物品标识列表
items_candidates = defaultdict(float)
# 整理用户行为数据
self._reorganize_behaviors(user=user)
# 基于物品的协同过滤生成推荐物品标识列表
candidates_items = self._generate_items_candidates(user=user, k=k)
print(candidates_items)
# 基于用户的协同过滤生成推荐物品标识列表
candidates_users = self._generate_users_candidates(user=user, k=k)
# 动态调整基于物品的协同过滤权重
alpha_weight = (
0.4
/ (1 + numpy.exp(0.05 * len(self.behaviors[user]["history_items"]) - 1.2))
+ 0.5
)
# 合并基于物品协同过滤算法生成的候选物品标识列表和基于用户协同过滤算法生成候的选物品标识列表
for item_candidate in candidates_items.keys() | candidates_users.keys():
items_candidates[item_candidate] = candidates_items[
item_candidate
] * alpha_weight + candidates_users[item_candidate] * (1 - alpha_weight)
return dict(
islice(sorted(items_candidates.items(), key=itemgetter(1), reverse=True), k)
)
def _generate_items_candidates(self, user, k):
"""基于物品的协同过滤生成推荐物品标识列表"""
# 召回物品标识列表
items_recall = defaultdict(lambda: {"counts": 0, "scores": 0})
# 用户行为数据
behaviors = self.behaviors[user]
# 启发物品标识列表
items_heuristic = behaviors["history_items"]
# 先通过启发物品标识在该物品的倒排索引召回启发用户标识,再通过启发用户标识在该用户的历史物品记录召回物品标识
for item_heuristic in items_heuristic:
for user_heuristic in self.items_inversion[item_heuristic]:
if user_heuristic != user:
for item_recall in self.behaviors[user_heuristic]["history_items"]:
if item_recall not in items_heuristic:
items_recall[item_recall]["counts"] += 1
# 计算余弦相似度
for item_heuristic in items_heuristic:
users_heuristic = self.items_inversion[item_heuristic]
for item_recall in items_recall:
users_recall = self.items_inversion[item_recall]
# 物品的相似度组成成分的分母
items_similarity_denominator = numpy.sqrt(
self.items_similarity_denominator_square[item_heuristic]
* self.items_similarity_denominator_square[item_recall]
)
# 构建有序物品标识对
pair = tuple(sorted((item_heuristic, item_recall)))
similarity = (
(
self.items_similarity_numerator[pair]
/ items_similarity_denominator
)
if items_similarity_denominator != 0
else 0
)
# 流行度抑制因子
popularity_suppressed = len(
list(set(users_heuristic) & set(users_recall))
) / numpy.sqrt(len(users_heuristic) * len(users_recall))
# 加权物品的相似度
items_recall[item_recall]["scores"] += (
behaviors["scores"][item_heuristic]
* similarity
* popularity_suppressed
)
return self._normalize_scores(items_recall=items_recall, k=k)
# 基于用户协同过滤算法生成候选物品标识列表
def _generate_users_candidates(self, user, k):
# 召回物品标识列表
items_recall = defaultdict(lambda: {"counts": 0, "scores": 0})
attributes = self.attributes[user]
# 若用户特征向量非初始化特征向量
if numpy.all(attributes != 0):
# 基于LSHash查询与用户特征向量相似的用户标识作为召回用户标识
for _, similarity, user_recall in self.attributes_indexer.query(
query_vector=attributes,
num_results=k,
dist_func="cosine",
):
behaviors_recall = self.behaviors[user_recall]
# 召回用户标识的历史物品标识作为召回物品标识
for item_recall in behaviors_recall["items_history"]:
# 若召回物品标识不在历史物品标识列表
if item_recall not in self.behaviors[user]["items_history"]:
items_recall[item_recall]["counts"] += 1
items_recall[item_recall]["scores"] += (
behaviors_recall["scores"][item_recall] * similarity
)
# 归一化候选物品标识列表
candidates = self._normalize_scores(items_recall=items_recall, k=k)
return candidates
@staticmethod
def _normalize_scores(items_recall, k):
"""归一化候选物品的相似度"""
# 候选物品标识列表
candidates = defaultdict(float)
print(items_recall)
if items_recall:
scores = [value["scores"] for value in items_recall.values()]
# 候选物品相似度的最小值和最大值
scores_minimum, scores_maximum = min(scores, default=0), max(
scores, default=0
)
# 值距
scores_range = scores_maximum - scores_minimum
# 基于最小值-最大值归一化
for item_recall in items_recall:
# 若值距不为0
if scores_range != 0:
candidates[item_recall] = (
(items_recall[item_recall]["scores"] - scores_minimum)
/ scores_range
) * 0.6 + 0.2
else:
candidates[item_recall] = 0.8
return dict(
islice(
sorted(candidates.items(), key=itemgetter(1), reverse=True),
k * 10,
)
)
if __name__ == "__main__":
# 初始化推荐系统
recommender = RecommenderSystem()
feedback_records = [
{
"user": "aaaaaa",
"item": "111111",
"type_": "rating",
"timestamp": int(time.time() - 3600),
"rating": 4,
},
{
"user": "aaaaaa",
"item": "333333",
"type_": "rating",
"timestamp": int(time.time() - 3200),
"rating": 4,
},
{
"user": "bbbbbb",
"item": "333333",
"type_": "rating",
"timestamp": int(time.time() - 3200),
"rating": 4,
},
{
"user": "cccccc",
"item": "111111",
"type_": "rating",
"timestamp": int(time.time() - 3200),
"rating": 5,
},
{
"user": "cccccc",
"item": "222222",
"type_": "rating",
"timestamp": int(time.time() - 3200),
"rating": 5,
},
{
"user": "cccccc",
"item": "444444",
"type_": "rating",
"timestamp": int(time.time() - 3200),
"rating": 3,
},
]
for feedback_record in feedback_records:
recommender.process_behavior_record(behavior_record=feedback_record)
a = recommender.generate_recommendations(user="aaaaaa")
print(a)
exit()
"""
import numpy
import pandas
import zipcodes
import re
from collections import Counter
from scipy.stats import chisquare
from scipy.stats.contingency import association
from sklearn.preprocessing import OneHotEncoder, StandardScaler
import math
from minisom import MiniSom
from sklearn.cluster import KMeans
import warnings
#忽略警告
warnings.simplefilter('ignore')
import sys
sys.path.append('..')
from utils.pandas2chart import Pandas2chart
from utils.algorithms import OptimalClusters
#本脚本中所调用的函数
#提取性别特征时将特征值“M”映射为“male”“F”映射为“female”
def Gender(element):
match element:
case 'M':
return 'male'
case 'F':
return 'female'
case default:
return numpy.nan
#提取年龄特征时将小于18岁映射为“under18”大于等于18岁且小于等于24岁映射为“18~24”大于等于25岁且小于等于34岁映射为“25~34”大于等于35岁且小于等于44岁映射为“35~44”大于等于45岁且小于等于54岁映射为“45~54”大于54岁映射为“above54””
def Age(element):
match element:
case age if age > 54:
return 'above54'
case age if age >= 45:
return '45~54'
case age if age >= 35:
return '35~44'
case age if age >= 25:
return '25~34'
case age if age >= 18:
return '18~24'
case age if age < 18:
return 'under18'
case default:
return numpy.nan
#提取职业特征时根据映射表映射
def Occupation(element):
match element:
case 0:
return 'other'
case 1:
return 'academic/educator'
case 2:
return 'artist'
case 3:
return 'clerical/admin'
case 4:
return 'college/grad student'
case 5:
return 'customer service'
case 6:
return 'doctor/health care'
case 7:
return 'executive/managerial'
case 8:
return 'farmer'
case 9:
return 'homemaker'
case 10:
return 'k-12 student'
case 11:
return 'lawyer'
case 12:
return 'programmer'
case 13:
return 'retired'
case 14:
return 'sales/marketing'
case 15:
return 'scientist'
case 16:
return 'self-employed'
case 17:
return 'technician/engineer'
case 18:
return 'tradesman/craftsman'
case 19:
return 'unemployed'
case 20:
return 'writer'
case default:
return numpy.nan
#提取州级行政区特征时根据邮政编码模糊查询州级行政区若为空或多个则映射为“null”否则为查询结果
def State(element):
#校验邮政编码格式
if not re.match(r'^\d{4,5}$|^\d{5}-\d{4}$', element):
#若邮政编码由9位数字组成修改邮政编码格式否则为“null”
if re.match(r'^\d{9}$', element):
element = element[: 5] + '-' + element[-4: ]
else:
return numpy.nan
#根据邮政编码模糊查询并解析州级行政区
states = [element.get('state') for element in zipcodes.similar_to(element)]
#若州级行政区数为1则将查询结果定义为州级行政区否则为“null”
if len(set(states)) == 1:
return states[0]
else:
return numpy.nan
#提取最近最喜欢的电影体裁特征
def Genres(series):
#合并每位用户评价过的电影体裁并切割为单个
genres = series.str.cat(sep = '|').split('|')
#查询数量最多的电影体裁并返回
return str.lower(Counter(genres).most_common(1)[0][0])
#独热编码特征名组合器
def Combiner(feature, category):
return str(feature) + ':' + str(category)
#若本脚本被调用报错
if __name__ != '__main__':
print('本脚本不允许被调用')
print()
exit()
print('1 打开本地数据文件,读取数据集...', end = '')
try:
dataset_users = pandas.read_csv(filepath_or_buffer = './MovieLens10K/users.csv', low_memory = False)
dataset_movies = pandas.read_csv(filepath_or_buffer = './MovieLens10K/movies.csv', low_memory = False)
dataset_ratings = pandas.read_csv(filepath_or_buffer ='./MovieLens10K/ratings.csv', low_memory = False)
except:
print('读取失败,请检查数据文件是否存在或正确')
print()
exit()
print('已完成')
print()
#评分数据集根据电影标识关联电影名称和体裁
dataset_ratings = dataset_ratings.merge(right = dataset_movies[['movieId', 'title', 'genres']], how = 'left', on = 'movieId')
#统计用户数
users = dataset_users.shape[0]
print('2 构建标签体系')
print()
print('2.1 提取特征...', end = '')
dataset = pandas.DataFrame(data = dataset_users['userId'].tolist(), columns = ['userId'])
#提取性别特征
dataset['gender'] = dataset_users['gender'].map(lambda element: Gender(element))
#提取年龄特征
dataset['age'] = dataset_users['age'].map(lambda element: Age(element))
#提取职业特征
dataset['occupation'] = dataset_users['occupation'].map(lambda element: Occupation(element))
#提取州级行政区特征
#dataset['state'] = dataset_users['zip'].map(lambda element: State(element))
#就评分数据集按照userId分组统计每位用户最喜欢的体裁
pivottable_ratings = dataset_ratings.groupby(by = 'userId').agg(
#最喜欢的电影体裁
genres = pandas.NamedAgg(column = 'genres', aggfunc = Genres)
)
pivottable_ratings.reset_index(inplace = True)
#合并分箱后评分数和平均评分,另最喜欢的电影体裁
dataset = dataset.merge(right = pivottable_ratings[['userId', 'genres']], how = 'left', on = 'userId')
#删除用户ID和包含缺失值的样本
dataset.pop('userId').dropna(inplace = True)
print('已完成')
print()
#统计样本数
samples = dataset.shape[0]
#获取特征名称
independents = dataset.columns
print('特征数据集中样本数为 %d 例,特征数为 %d 个。' % (samples, len(independents)))
print()
print('2.2 检验各特征各项样本数是否符合均匀分布')
print()
for independent in independents:
#按照特征分组,统计各项样本数
pivottable = dataset.groupby(by = independent).agg(
samples = pandas.NamedAgg(column = independent, aggfunc = 'count')
)
#检验各项样本数是否均匀分布
statistic, probability = chisquare(f_obs = pivottable['samples'].to_numpy())
if probability < 0.05:
print('特征 %s 各项样本数不符合均匀分布,卡方统计量为 %.2f,概率为 %.2f' % (independent, statistic, probability))
print()
else:
print('特征 %s 各项样本数符合均匀分布,卡方统计量为 %.2f,概率为 %.2f' % (independent, statistic, probability))
print()
pivottable.reset_index(inplace = True)
#按照样本数倒序排序
pivottable.sort_values(by = 'samples', ascending = False, inplace = True)
#若项数大于指定值则将第指定值项至最后一项合并为一项指定值为6
if pivottable.shape[0] > 6:
pivottable_marging = pivottable.iloc[: 5]
#合并后的项名为others统计第指定值-1项至最后一项样本数的和
pivottable_marging.loc[pivottable_marging.shape[0]] = ['others', pivottable.iloc[5: , 1].sum()]
else:
pivottable_marging = pivottable
#生成环形图
Pandas2chart(dataset = pivottable_marging, type = 'circular', path = './reports/persona_report/circular_{}.html'.format(independent))
print('2.3 统计特征之间相关系数')
print()
#用于保存特征之间克莱姆相关系数矩阵
correlation_matrix = pandas.DataFrame(data = [], index = independents, columns = independents)
#用于保存相关特征对
correlation_pairs = []
for index, independent_index in enumerate(independents):
for column, independent_column in enumerate(independents):
#统计特征之间克莱姆相关系数
statistic = round(association(observed = pandas.crosstab(index = dataset[independent_index], columns = dataset[independent_column])), 2)
correlation_matrix.loc[independent_index, independent_column] = statistic
#获取相关特征对
if column > index and statistic >= 0.25:
correlation_pairs.append({'independent': independent_index, 'independent_correlation': independent_column})
#生成相关系数矩阵热力图
Pandas2chart(dataset = correlation_matrix, type = 'heatmap', path = './reports/persona_report/heatmap_correlation_matrix.html')
print('3、构建用户细分群体')
print()
print('3.1 独热编码特征并标准化...', end = '')
#独热编码特征,用于决策树算法模型
onehot_encoder = OneHotEncoder(sparse_output = False, handle_unknown = 'ignore', feature_name_combiner = Combiner).fit(X = dataset.to_numpy())
dataset_processing = pandas.DataFrame(data = onehot_encoder.transform(X = dataset.to_numpy()), columns = onehot_encoder.get_feature_names_out(input_features = independents)).astype(dtype = 'int')
#独热编码特征
dataset_preprocessing = OneHotEncoder(sparse_output = False, handle_unknown = 'ignore').fit_transform(X = dataset.to_numpy())
#标准化特征
dataset_preprocessing = StandardScaler().fit_transform(X = dataset_preprocessing)
print('已完成')
print()
print('3.2 基于自我组织映射算法初步聚类...', end = '')
#定义竞争层的长度和高度(经验值)
competitive_layer_length = competitive_layer_heigth = math.ceil(2.25 * math.pow(samples, 0.25))
#创建自我组织映射算法模型
som = MiniSom(x = competitive_layer_length, y = competitive_layer_heigth, input_len = dataset_preprocessing.shape[1], sigma = math.sqrt(math.pow(competitive_layer_length, 2) + math.pow(competitive_layer_heigth, 2)), activation_distance = 'cosine', random_seed = 0)
#初始化模型
som.pca_weights_init(data = dataset_preprocessing)
#训练模型
som.train_batch(data = dataset_preprocessing, num_iteration = 10)
#获取各样本的竞争层中优胜点坐标
dataset_preprocessing = [som.winner(record) for record in dataset_preprocessing]
dataset_preprocessing = pandas.DataFrame(data = dataset_preprocessing, columns = ['axis_x', 'axis_y'])
print('已完成')
print()
print('3.3 就各样本的竞争层中优胜点坐标基于K均值算法再次聚类使用间隔统计量评估聚类效果并确定最优聚类簇数...', end = '')
#创建K均值算法模型并训练
kmeans = KMeans(n_clusters = OptimalClusters(dataset_preprocessing.to_numpy()), n_init = 'auto').fit(dataset_preprocessing.to_numpy())
dataset_processing['cluster_label'] = kmeans.labels_
print('已完成')
print()
print('3.4 基于决策树拟合聚类结果并输出聚类规则...', end = '')
print(dataset_processing['cluster_label'].max())
"""