251029更新

This commit is contained in:
marslbr 2025-10-29 18:58:26 +08:00
parent a6fe763c0f
commit c2229eebc7
1 changed files with 194 additions and 155 deletions

View File

@ -1,55 +1,50 @@
# -*- coding: utf-8 -*-
'''
"""
脚本说明
推荐系统召回阶段使用基于用户物品协同过滤算法生成候选物品列表精排阶段使用时序建模和多头注意力机制进一步精排最终形成推荐物品列表
'''
"""
import heapq
import time
from collections import defaultdict, deque
from itertools import islice
from operator import itemgetter
from typing import Literal
import numpy
from pyLSHash import LSHash
# 导入模块
from pydantic import BaseModel, Field, model_validator
from typing import Literal
from itertools import islice
from operator import itemgetter
import time
import numpy
from collections import deque, defaultdict
import heapq
from pyLSHash import LSHash
'''
"""
编码区域
'''
"""
# 数据模型:初始化参数
class InitializationArguments(BaseModel):
"""推荐系统初始化参数数据模型"""
# 时间窗口(单位为天)
# 时间窗口(单位为天),平衡实时性和运算效率
time_window: int = Field(default=30, ge=5, le=360)
# 衰减兰布达因子
# 衰减因子兰布达系数,控制兴趣分数衰减速率
decay_lambda: float = Field(default=0, ge=0.00, le=10)
# 用户特征向量维度数
attributes_dimensions: int = Field(default=10, ge=2.00, le=200)
# 数据模型:用户属性
class Attributes(BaseModel):
"""用户属性数据模型"""
# 年龄,数据类型为整数
# 年龄
age: int = Field(default=..., ge=1, le=99)
# 性别
gender: Literal['male', 'female'] = Field(default=...)
gender: Literal["male", "female"] = Field(default=...)
# 市
city: str = Field(default=...)
@ -58,79 +53,67 @@ class Attributes(BaseModel):
occupation: str = Field(default=...)
# 数据模型:设置记录
class SettingRecord(BaseModel):
class Record(BaseModel):
"""记录数据模型"""
# 用户,数据类型为字符,无默认值,必填,六位
# 用户标识
user: str = Field(default=..., min_length=6, max_length=6)
# 用户属性
# 用户属性
attributes: Attributes = Field(default=...)
# 数据模型:用户行为记录
class BehaviorRecord(BaseModel):
"""用户行为记录数据模型"""
# 用户标识,数据类型为字符,无默认值,必填,六位
# 用户标识
user: str = Field(default=..., min_length=6, max_length=6)
# 时间戳,数据类型为时间戳,必填
# 时间戳
timestamp: int = Field(default=...)
# 行为类型,数据类型为列表,指定范围
type: Literal['rating'] = Field(default=...)
# 行为类型
type: Literal["rating"] = Field(default=...)
# 物品标识,数据类型为字符,无默认值,必填,六位
# 物品标识
item: str = Field(default=..., min_length=6, max_length=6)
# 评分,数据类型为整数或空默认为空大于等于1小于等于5
# 评分
rating: int | None = Field(default=None, ge=1, le=5)
# 校验规则:若行为类型为评分则评分非空
@model_validator(mode='after')
@model_validator(mode="after")
def _validate_rating(self):
if self.type == 'rating' and not self.rating:
raise ValueError(
'the rating parameter is required when type=rating')
if self.type == "rating" and not self.rating:
raise ValueError("若行为类型为评分则拼分必填")
return self
# 原型级推荐系统
class PrototypeRecommender:
class RecommenderSystem:
"""基于协同过滤的推荐系统"""
def __init__(self, **arguments):
"""初始化推荐系统"""
# 行为参数(不同行为赋予不同的的基础兴趣分数和衰减因子权重。其中,显式行为兴趣分数根据评分转化获得
self.behavior_arguments = {'rating': (None, 1)}
# 行为参数配置项(不同行为类型配置不同的的兴趣基础分数和衰减权重
self.behavior_arguments = {"rating": (None, 1)}
# 校验初始化参数数据模型并转为初始参数字典
# 校验并解析初始化参数
arguments = InitializationArguments(**arguments).model_dump()
self.time_window = arguments.get("time_window")
self.decay_lambda = arguments.get("decay_lambda")
self.attributes_dimensions = arguments.get("attributes_dimensions")
# 时间窗口,单位为天(将连续数据流切割为有限片段,平衡推荐系统实时性和运算效率)
self.time_window = arguments.get('time_window')
# 衰减因子兰布达系数(时间窗口内兴趣分数随时间衰减程度)
self.decay_lambda = arguments.get('decay_lambda')
# 用户特征向量维度数
self.attributes_dimensions = arguments.get('attributes_dimensions')
# 用户行为数据体(基于物品协同过滤的核心数据体)
self.behaviors = defaultdict(lambda: {
# 兴趣分数列表(时间窗口内由物品标识和兴趣分数组成的字典),为最新兴趣分数,形如{'物品标识': '兴趣分数'}
'scores': defaultdict(float),
# 时间戳堆(时间窗口内由时间戳和物品标识组成的元组),例如('时间戳', '物品标识')
'timestamps_heap': [],
# 历史物品标识列表默认最多保存200例历史物品标识平衡推荐系统实时性和运算效率形如'物品标识'
'items_history': deque(maxlen=200)
})
# 用户行为数据储存对象
self.behaviors = defaultdict(
lambda: {
# 兴趣分数列表(时间窗口内由物品标识和兴趣分数组成的字典),为最新兴趣分数,形如{'物品标识': '兴趣分数'}
"scores": defaultdict(float),
# 时间戳堆(时间窗口内由时间戳和物品标识组成的元组),例如('时间戳', '物品标识')
"timestamps_heap": [],
# 历史物品标识列表默认最多保存200例历史物品标识平衡推荐系统实时性和运算效率形如'物品标识'
"items_history": deque(maxlen=200),
}
)
# 在计算物品标识-物品标识余弦相似度时可分解为分子部分和分母平方部分并在新增/更新用户行为时增量更新,以优化运算效率
@ -144,12 +127,12 @@ class PrototypeRecommender:
self.items_inversion = defaultdict(list)
# 用户特征向量数据体(基于用户协同过滤的核心数据体,用户属性以独热编码方式保存)
self.attributes = defaultdict(
lambda: numpy.zeros(self.attributes_dimensions))
self.attributes = defaultdict(lambda: numpy.zeros(self.attributes_dimensions))
# 基于LSHash作为用户特征向量索引器默认哈希值的二进制位数为8哈希表数为2哈希矩阵持久化路径
self.attributes_indexer = LSHash(
hash_size=8, input_dim=self.attributes_dimensions, num_hashtables=2)
hash_size=8, input_dim=self.attributes_dimensions, num_hashtables=2
)
# 处理用户属性记录
def process_attribute_record(self, attribute_record: dict):
@ -157,15 +140,15 @@ class PrototypeRecommender:
# 校验设置记录数据模型
attribute_record = SettingRecord(**attribute_record).model_dump()
user = attribute_record.get('user')
user = attribute_record.get("user")
for key, value in attribute_record.get('attributes').items():
for key, value in attribute_record.get("attributes").items():
# 若用户属性值非空
if value:
# 更新用户属性
self.behaviors[user]['attributes'][key] = value
self.behaviors[user]["attributes"][key] = value
return True
@ -176,46 +159,45 @@ class PrototypeRecommender:
behavior_record = BehaviorRecord(**behavior_record).model_dump()
# 用户标识
user = behavior_record['user']
user = behavior_record["user"]
# 时间戳
timestamp = behavior_record['timestamp']
timestamp = behavior_record["timestamp"]
# 行为类型
type = behavior_record['type']
type = behavior_record["type"]
# 物品标识
item = behavior_record['item']
item = behavior_record["item"]
# 评分若行为类型为评分则评分必填评分数据类型为整数指定范围1~5
rating = behavior_record['rating']
rating = behavior_record["rating"]
# 整理用户行为数据
self._reorganize_behaviors(user=user)
# 原兴趣分数
score_past = self.behaviors[user]['scores'][item]
score_past = self.behaviors[user]["scores"][item]
# 现兴趣分数
score = self._calculate_score(
timestamp=timestamp, type=type, rating=rating)
score = self._calculate_score(timestamp=timestamp, type=type, rating=rating)
# 若现兴趣分数大于原兴趣分数
if score > score_past:
# 更新兴趣分数列表
self.behaviors[user]['scores'][item] = score
self.behaviors[user]["scores"][item] = score
# 更新时间戳堆
heapq.heappush(self.behaviors[user]
['timestamps_heap'], (timestamp, item))
heapq.heappush(self.behaviors[user]["timestamps_heap"], (timestamp, item))
# 更新历史物品标识列表
self.behaviors[user]['items_history'].append(item)
self.behaviors[user]["items_history"].append(item)
# 更新计算物品标识-物品标识余弦相似度的分子和分子平方部分
self._update_items_similarity_components(
user=user, item=item, score_old=score_past, score_new=score)
user=user, item=item, score_old=score_past, score_new=score
)
# 若用户标识在物品标识倒排表中索引为物品标识的用户标识列表
if user not in self.items_inversion[item]:
@ -227,33 +209,36 @@ class PrototypeRecommender:
def _reorganize_behaviors(self, user):
# 时间戳堆
timestamps_heap = self.behaviors[user]['timestamps_heap']
timestamps_heap = self.behaviors[user]["timestamps_heap"]
# 若时间戳堆非空列表且现在距时间戳堆中第一元组的时间戳的时距大于时间窗口
while timestamps_heap and (time.time() - timestamps_heap[0][0]) / 86400 > self.time_window:
while (
timestamps_heap
and (time.time() - timestamps_heap[0][0]) / 86400 > self.time_window
):
# 删除时间戳堆中第一元组并获取过期时间戳和过期物品标识
timestamp_expired, item_expired = heapq.heappop(timestamps_heap)
# 若过期物品标识的兴趣分数非空
if self.behaviors[user]['scores'][item_expired]:
if self.behaviors[user]["scores"][item_expired]:
# 获取过期物品标识的兴趣分数
score_expired = self.behaviors[user]['scores'][item_expired]
score_expired = self.behaviors[user]["scores"][item_expired]
# 在兴趣分数列表删除索引为过期物品标识的项
del self.behaviors[user]['scores'][item_expired]
del self.behaviors[user]["scores"][item_expired]
# 若过期物品标识在历史物品标识列表
if item_expired in self.behaviors[user]['items_history']:
if item_expired in self.behaviors[user]["items_history"]:
# 在历史物品标识列表删除过期物品标识
self.behaviors[user]['items_history'].remove(item_expired)
self.behaviors[user]["items_history"].remove(item_expired)
# 更新更新计算物品标识-物品标识余弦相似度的分子和分子平方部分
self._update_items_similarity_components(
user=user, item=item_expired, score_old=score_expired, score_new=0)
user=user, item=item_expired, score_old=score_expired, score_new=0
)
# 若用户标识在物品标识倒排表索引为过期物品标识的用户标识列表
if user in self.items_inversion[item_expired]:
@ -284,7 +269,7 @@ class PrototypeRecommender:
score_base, weight = self.behavior_arguments.get(type)
# 若行为类型为评分
if type == 'rating':
if type == "rating":
# 基础兴趣分数经非线性转化为0.2至0.8
score_base = 0.2 + 0.6 * (1 / (1 + numpy.exp(3 - rating)))
@ -293,15 +278,14 @@ class PrototypeRecommender:
decay_lambda_weighted = self.decay_lambda * weight
# 基于指数函数计算兴趣评分
score = score_base * \
numpy.exp(0 - decay_lambda_weighted * time_interval)
score = score_base * numpy.exp(0 - decay_lambda_weighted * time_interval)
return score
# 更新计算物品标识-物品标识余弦相似度的分子和分子平方部分
def _update_items_similarity_components(self, user, item, score_old, score_new):
for item_history in self.behaviors[user]['items_history']:
for item_history in self.behaviors[user]["items_history"]:
if item_history != item:
@ -309,7 +293,8 @@ class PrototypeRecommender:
pair = tuple(sorted((item_history, item)))
self.items_similarity_numerator[pair] += (
score_new - score_old) * self.behaviors[user]['scores'][item_history]
score_new - score_old
) * self.behaviors[user]["scores"][item_history]
self.items_similarity_denominator_square[item] += score_new**2 - score_old**2
@ -323,44 +308,47 @@ class PrototypeRecommender:
self._reorganize_behaviors(user=user)
# 最大候选数
maximum_candidates = 20 * k
maximum_candidates = k * 10
behaviors = self.behaviors[user]
# 历史物品标识列表
items_history = behaviors['items_history']
items_history = behaviors["items_history"]
# 基于物品协同过滤算法生成的候选物品兴趣分数权重
alpha_weight = 0.2 / \
(1 + numpy.exp(0.05 * len(items_history) - 1.2)) + 0.65
alpha_weight = 0.2 / (1 + numpy.exp(0.05 * len(items_history) - 1.2)) + 0.65
# 基于物品协同过滤算法生成候选物品标识列表
candidates_items = self._generate_items_candidates(
user=user, maximum_candidates=maximum_candidates)
user=user, maximum_candidates=maximum_candidates
)
# 基于用户协同过滤算法生成候选物品标识列表
candidates_users = self._generate_users_candidates(
user=user, maximum_candidates=maximum_candidates)
user=user, maximum_candidates=maximum_candidates
)
# 合并基于物品协同过滤算法生成的候选物品标识列表和基于用户协同过滤算法生成候的选物品标识列表
for item_candidate in candidates_items.keys() | candidates_users.keys():
items_candidates[item_candidate] = candidates_items[item_candidate] * \
alpha_weight + \
candidates_users[item_candidate] * (1 - alpha_weight)
items_candidates[item_candidate] = candidates_items[
item_candidate
] * alpha_weight + candidates_users[item_candidate] * (1 - alpha_weight)
return dict(islice(sorted(items_candidates.items(), key=itemgetter(1), reverse=True), k))
return dict(
islice(sorted(items_candidates.items(), key=itemgetter(1), reverse=True), k)
)
# 基于物品协同过滤算法生成候选物品标识列表
def _generate_items_candidates(self, user, maximum_candidates):
# 召回物品标识列表
items_recall = defaultdict(lambda: {'counts': 0, 'scores': 0})
items_recall = defaultdict(lambda: {"counts": 0, "scores": 0})
behaviors = self.behaviors[user]
# 历史物品标识列表作为启发物品标识列表
items_heuristic = behaviors['items_history']
items_heuristic = behaviors["items_history"]
# 先通过启发式物品标识在物品标识倒排表查询索引为启发式物品标识的启发式用户标识,再通过启发式用户标识查询历史物品标识列表作为候选物品标识
for item_heuristic in items_heuristic:
@ -370,12 +358,12 @@ class PrototypeRecommender:
# 若通过启发式物品标识在物品标识倒排表查询索引为启发式物品标识的启发式用户标识和用户标识不一致
if user_heuristic != user:
for item_recall in self.behaviors[user_heuristic]['items_history']:
for item_recall in self.behaviors[user_heuristic]["items_history"]:
# 若召回物品标识不在启发物品标识列表
if item_recall not in items_heuristic:
items_recall[item_recall]['counts'] += 1
items_recall[item_recall]["counts"] += 1
# 遍历启发式物品标识列表和召回物品标识列表(召回物品标识列表不可能包含启发式物品标识),计算余弦相似度
for item_heuristic in items_heuristic:
@ -390,7 +378,9 @@ class PrototypeRecommender:
# 计算物品标识-物品标识余弦相似度时分母部分
items_similarity_denominator = numpy.sqrt(
self.items_similarity_denominator_square[item_heuristic] * self.items_similarity_denominator_square[item_recall])
self.items_similarity_denominator_square[item_heuristic]
* self.items_similarity_denominator_square[item_recall]
)
# 计算物品标识-物品标识余弦相似度时分母部分不为0
if items_similarity_denominator != 0:
@ -400,7 +390,9 @@ class PrototypeRecommender:
# 余弦相似度
similarity = (
self.items_similarity_numerator[pair] / items_similarity_denominator)
self.items_similarity_numerator[pair]
/ items_similarity_denominator
)
else:
@ -410,15 +402,20 @@ class PrototypeRecommender:
users_common = list(set(users_heuristic) & set(users_recall))
# 抑制流行物品因子
popularity_suppressed = len(
users_common) / numpy.sqrt(len(users_heuristic) * len(users_recall))
popularity_suppressed = len(users_common) / numpy.sqrt(
len(users_heuristic) * len(users_recall)
)
items_recall[item_recall]['scores'] += behaviors['scores'][item_heuristic] * \
similarity * popularity_suppressed
items_recall[item_recall]["scores"] += (
behaviors["scores"][item_heuristic]
* similarity
* popularity_suppressed
)
# 归一化候选物品标识列表
candidates = self._normalize_candidates(
items_recall=items_recall, maximum_candidates=maximum_candidates)
items_recall=items_recall, maximum_candidates=maximum_candidates
)
return candidates
@ -426,7 +423,7 @@ class PrototypeRecommender:
def _generate_users_candidates(self, user, maximum_candidates):
# 召回物品标识列表
items_recall = defaultdict(lambda: {'counts': 0, 'scores': 0})
items_recall = defaultdict(lambda: {"counts": 0, "scores": 0})
attributes = self.attributes[user]
@ -434,23 +431,30 @@ class PrototypeRecommender:
if numpy.all(attributes != 0):
# 基于LSHash查询与用户特征向量相似的用户标识作为召回用户标识
for _, similarity, user_recall in self.attributes_indexer.query(query_vector=attributes, num_results=maximum_candidates, dist_func='cosine'):
for _, similarity, user_recall in self.attributes_indexer.query(
query_vector=attributes,
num_results=maximum_candidates,
dist_func="cosine",
):
behaviors_recall = self.behaviors[user_recall]
# 召回用户标识的历史物品标识作为召回物品标识
for item_recall in behaviors_recall['items_history']:
for item_recall in behaviors_recall["items_history"]:
# 若召回物品标识不在历史物品标识列表
if item_recall not in self.behaviors[user]['items_history']:
if item_recall not in self.behaviors[user]["items_history"]:
items_recall[item_recall]['counts'] += 1
items_recall[item_recall]["counts"] += 1
items_recall[item_recall]['scores'] += behaviors_recall['scores'][item_recall] * similarity
items_recall[item_recall]["scores"] += (
behaviors_recall["scores"][item_recall] * similarity
)
# 归一化候选物品标识列表
candidates = self._normalize_candidates(
items_recall=items_recall, maximum_candidates=maximum_candidates)
items_recall=items_recall, maximum_candidates=maximum_candidates
)
return candidates
@ -464,7 +468,7 @@ class PrototypeRecommender:
if items_recall:
# 候选物品兴趣分数
scores = [nest['scores'] for nest in items_recall.values()]
scores = [nest["scores"] for nest in items_recall.values()]
# 候选物品相似分数最小值
scores_minimum = min(scores, default=0)
@ -482,7 +486,9 @@ class PrototypeRecommender:
for item_recall in items_recall:
candidates[item_recall] = (
(items_recall[item_recall]['scores'] - scores_minimum) / scores_range) * 0.6 + 0.2
(items_recall[item_recall]["scores"] - scores_minimum)
/ scores_range
) * 0.6 + 0.2
else:
@ -492,45 +498,78 @@ class PrototypeRecommender:
candidates[item_recall] = 0.8
# 根据兴趣分数倒序排序并截取
candidates = dict(islice(sorted(candidates.items(), key=itemgetter(
1), reverse=True), maximum_candidates))
candidates = dict(
islice(
sorted(candidates.items(), key=itemgetter(1), reverse=True),
maximum_candidates,
)
)
return candidates
if __name__ == "__main__":
# 初始化引擎
recommender = PrototypeRecommender()
# 初始化推荐系统
recommender = RecommenderSystem()
feedback_records = [
{'user': 'aaaaaa', 'item': '111111', 'type': 'rating',
'timestamp': int(time.time() - 3600), 'rating': 4},
{'user': 'aaaaaa', 'item': '333333', 'type': 'rating',
'timestamp': int(time.time() - 3200), 'rating': 4},
{'user': 'bbbbbb', 'item': '333333', 'type': 'rating',
'timestamp': int(time.time() - 3200), 'rating': 4},
{'user': 'cccccc', 'item': '111111', 'type': 'rating',
'timestamp': int(time.time() - 3200), 'rating': 5},
{'user': 'cccccc', 'item': '222222', 'type': 'rating',
'timestamp': int(time.time() - 3200), 'rating': 5},
{'user': 'cccccc', 'item': '333333', 'type': 'rating',
'timestamp': int(time.time() - 3200), 'rating': 3}
{
"user": "aaaaaa",
"item": "111111",
"type": "rating",
"timestamp": int(time.time() - 3600),
"rating": 4,
},
{
"user": "aaaaaa",
"item": "333333",
"type": "rating",
"timestamp": int(time.time() - 3200),
"rating": 4,
},
{
"user": "bbbbbb",
"item": "333333",
"type": "rating",
"timestamp": int(time.time() - 3200),
"rating": 4,
},
{
"user": "cccccc",
"item": "111111",
"type": "rating",
"timestamp": int(time.time() - 3200),
"rating": 5,
},
{
"user": "cccccc",
"item": "222222",
"type": "rating",
"timestamp": int(time.time() - 3200),
"rating": 5,
},
{
"user": "cccccc",
"item": "333333",
"type": "rating",
"timestamp": int(time.time() - 3200),
"rating": 3,
},
]
for feedback_record in feedback_records:
recommender.process_behavior_record(behavior_record=feedback_record)
a = recommender.generate_recommendations(user='cccccc')
a = recommender.generate_recommendations(user="cccccc")
print(a)
exit()
'''
"""
@ -983,4 +1022,4 @@ print(dataset_processing['cluster_label'].max())
'''
"""